diff options
author | Sean Owen <sowen@cloudera.com> | 2017-02-19 09:42:50 -0800 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-19 09:42:50 -0800 |
commit | 1487c9af20a333ead55955acf4c0aa323bea0d07 (patch) | |
tree | 5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /common/network-shuffle | |
parent | de14d35f77071932963a994fac5aec0e5df838a1 (diff) | |
download | spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2 spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip |
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request?
Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code.
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'common/network-shuffle')
6 files changed, 87 insertions, 110 deletions
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 52f50a3409..c0e170e5b9 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -19,11 +19,11 @@ package org.apache.spark.network.sasl; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.Lists; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -38,7 +38,6 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.client.ChunkReceivedCallback; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.client.TransportClientBootstrap; import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; @@ -105,8 +104,7 @@ public class SaslIntegrationSuite { @Test public void testGoodClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); String msg = "Hello, World!"; @@ -120,8 +118,7 @@ public class SaslIntegrationSuite { when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app"); when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password"); clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "unknown-app", badKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "unknown-app", badKeyHolder))); try { // Bootstrap should fail on startup. @@ -134,8 +131,7 @@ public class SaslIntegrationSuite { @Test public void testNoSaslClient() throws IOException, InterruptedException { - clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList()); + clientFactory = context.createClientFactory(new ArrayList<>()); TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); try { @@ -159,15 +155,11 @@ public class SaslIntegrationSuite { RpcHandler handler = new TestRpcHandler(); TransportContext context = new TransportContext(conf, handler); clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); - TransportServer server = context.createServer(); - try { + Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); + try (TransportServer server = context.createServer()) { clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); } catch (Exception e) { assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge format violation")); - } finally { - server.close(); } } @@ -191,14 +183,13 @@ public class SaslIntegrationSuite { try { // Create a client, and make a request to fetch blocks from a different app. clientFactory = blockServerContext.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); client1 = clientFactory.createClient(TestUtils.getLocalHost(), blockServer.getPort()); - final AtomicReference<Throwable> exception = new AtomicReference<>(); + AtomicReference<Throwable> exception = new AtomicReference<>(); - final CountDownLatch blockFetchLatch = new CountDownLatch(1); + CountDownLatch blockFetchLatch = new CountDownLatch(1); BlockFetchingListener listener = new BlockFetchingListener() { @Override public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { @@ -235,12 +226,11 @@ public class SaslIntegrationSuite { // Create a second client, authenticated with a different app ID, and try to read from // the stream created for the previous app. clientFactory2 = blockServerContext.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-2", secretKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "app-2", secretKeyHolder))); client2 = clientFactory2.createClient(TestUtils.getLocalHost(), blockServer.getPort()); - final CountDownLatch chunkReceivedLatch = new CountDownLatch(1); + CountDownLatch chunkReceivedLatch = new CountDownLatch(1); ChunkReceivedCallback callback = new ChunkReceivedCallback() { @Override public void onSuccess(int chunkIndex, ManagedBuffer buffer) { @@ -284,7 +274,7 @@ public class SaslIntegrationSuite { } } - private void checkSecurityException(Throwable t) { + private static void checkSecurityException(Throwable t) { assertNotNull("No exception was caught.", t); assertTrue("Expected SecurityException.", t.getMessage().contains(SecurityException.class.getName())); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index c036bc2e8d..e47a72c9d1 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -93,7 +93,7 @@ public class ExternalShuffleBlockHandlerSuite { ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class); verify(callback, times(1)).onSuccess(response.capture()); - verify(callback, never()).onFailure((Throwable) any()); + verify(callback, never()).onFailure(any()); StreamHandle handle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue()); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 7757500b41..47c087088a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -60,12 +60,10 @@ public class ExternalShuffleCleanupSuite { public void cleanupUsesExecutor() throws IOException { TestShuffleDataContext dataContext = createSomeData(); - final AtomicBoolean cleanupCalled = new AtomicBoolean(false); + AtomicBoolean cleanupCalled = new AtomicBoolean(false); // Executor which does nothing to ensure we're actually using it. - Executor noThreadExecutor = new Executor() { - @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } - }; + Executor noThreadExecutor = runnable -> cleanupCalled.set(true); ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, null, noThreadExecutor); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 88de6fb83c..b8ae04eefb 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -29,7 +30,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.After; import org.junit.AfterClass; @@ -173,7 +173,7 @@ public class ExternalShuffleIntegrationSuite { FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" }); assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks); assertTrue(exec0Fetch.failedBlocks.isEmpty()); - assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0])); + assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0])); exec0Fetch.releaseBuffers(); } @@ -185,7 +185,7 @@ public class ExternalShuffleIntegrationSuite { assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"), exec0Fetch.successBlocks); assertTrue(exec0Fetch.failedBlocks.isEmpty()); - assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks)); + assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks)); exec0Fetch.releaseBuffers(); } @@ -241,7 +241,7 @@ public class ExternalShuffleIntegrationSuite { assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); } - private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) + private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false); client.init(APP_ID); @@ -249,7 +249,7 @@ public class ExternalShuffleIntegrationSuite { executorId, executorInfo); } - private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1) + private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1) throws Exception { assertEquals(list0.size(), list1.size()); for (int i = 0; i < list0.size(); i ++) { @@ -257,7 +257,8 @@ public class ExternalShuffleIntegrationSuite { } } - private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception { + private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) + throws Exception { ByteBuffer nio0 = buffer0.nioByteBuffer(); ByteBuffer nio1 = buffer1.nioByteBuffer(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java index 2590b9ce4c..3e51fea3cf 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Maps; import io.netty.buffer.Unpooled; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -85,8 +83,8 @@ public class OneForOneBlockFetcherSuite { // Each failure will cause a failure to be invoked in all remaining block fetches. verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0")); - verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any()); - verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) any()); + verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any()); + verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any()); } @Test @@ -100,15 +98,15 @@ public class OneForOneBlockFetcherSuite { // We may call both success and failure for the same block. verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0")); - verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any()); + verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any()); verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2")); - verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) any()); + verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any()); } @Test public void testEmptyBlockFetch() { try { - fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap()); + fetchBlocks(Maps.newLinkedHashMap()); fail(); } catch (IllegalArgumentException e) { assertEquals("Zero-sized blockIds array", e.getMessage()); @@ -123,52 +121,46 @@ public class OneForOneBlockFetcherSuite { * * If a block's buffer is "null", an exception will be thrown instead. */ - private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) { + private static BlockFetchingListener fetchBlocks(LinkedHashMap<String, ManagedBuffer> blocks) { TransportClient client = mock(TransportClient.class); BlockFetchingListener listener = mock(BlockFetchingListener.class); - final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); + String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener); - // Respond to the "OpenBlocks" message with an appropirate ShuffleStreamHandle with streamId 123 - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer( - (ByteBuffer) invocationOnMock.getArguments()[0]); - RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1]; - callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer()); - assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message); - return null; - } + // Respond to the "OpenBlocks" message with an appropriate ShuffleStreamHandle with streamId 123 + doAnswer(invocationOnMock -> { + BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer( + (ByteBuffer) invocationOnMock.getArguments()[0]); + RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1]; + callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer()); + assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message); + return null; }).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class)); // Respond to each chunk request with a single buffer from our blocks array. - final AtomicInteger expectedChunkIndex = new AtomicInteger(0); - final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - try { - long streamId = (Long) invocation.getArguments()[0]; - int myChunkIndex = (Integer) invocation.getArguments()[1]; - assertEquals(123, streamId); - assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex); - - ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2]; - ManagedBuffer result = blockIterator.next(); - if (result != null) { - callback.onSuccess(myChunkIndex, result); - } else { - callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex)); - } - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected failure"); + AtomicInteger expectedChunkIndex = new AtomicInteger(0); + Iterator<ManagedBuffer> blockIterator = blocks.values().iterator(); + doAnswer(invocation -> { + try { + long streamId = (Long) invocation.getArguments()[0]; + int myChunkIndex = (Integer) invocation.getArguments()[1]; + assertEquals(123, streamId); + assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex); + + ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2]; + ManagedBuffer result = blockIterator.next(); + if (result != null) { + callback.onSuccess(myChunkIndex, result); + } else { + callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex)); } - return null; + } catch (Exception e) { + e.printStackTrace(); + fail("Unexpected failure"); } - }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) any()); + return null; + }).when(client).fetchChunk(anyLong(), anyInt(), any()); fetcher.start(); return listener; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 6db71eea6e..a530e16734 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -28,7 +28,6 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.stubbing.Stubber; @@ -84,7 +83,7 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any()); + verify(listener).onBlockFetchFailure(eq("b0"), any()); verify(listener).onBlockFetchSuccess("b1", block1); verifyNoMoreInteractions(listener); } @@ -190,7 +189,7 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any()); + verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any()); verifyNoMoreInteractions(listener); } @@ -220,7 +219,7 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any()); + verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any()); verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2); verifyNoMoreInteractions(listener); } @@ -249,40 +248,37 @@ public class RetryingBlockFetcherSuite { Stubber stub = null; // Contains all blockIds that are referenced across all interactions. - final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet(); + LinkedHashSet<String> blockIds = Sets.newLinkedHashSet(); - for (final Map<String, Object> interaction : interactions) { + for (Map<String, Object> interaction : interactions) { blockIds.addAll(interaction.keySet()); - Answer<Void> answer = new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - try { - // Verify that the RetryingBlockFetcher requested the expected blocks. - String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0]; - String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]); - assertArrayEquals(desiredBlockIds, requestedBlockIds); - - // Now actually invoke the success/failure callbacks on each block. - BlockFetchingListener retryListener = - (BlockFetchingListener) invocationOnMock.getArguments()[1]; - for (Map.Entry<String, Object> block : interaction.entrySet()) { - String blockId = block.getKey(); - Object blockValue = block.getValue(); - - if (blockValue instanceof ManagedBuffer) { - retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue); - } else if (blockValue instanceof Exception) { - retryListener.onBlockFetchFailure(blockId, (Exception) blockValue); - } else { - fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue); - } + Answer<Void> answer = invocationOnMock -> { + try { + // Verify that the RetryingBlockFetcher requested the expected blocks. + String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0]; + String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]); + assertArrayEquals(desiredBlockIds, requestedBlockIds); + + // Now actually invoke the success/failure callbacks on each block. + BlockFetchingListener retryListener = + (BlockFetchingListener) invocationOnMock.getArguments()[1]; + for (Map.Entry<String, Object> block : interaction.entrySet()) { + String blockId = block.getKey(); + Object blockValue = block.getValue(); + + if (blockValue instanceof ManagedBuffer) { + retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue); + } else if (blockValue instanceof Exception) { + retryListener.onBlockFetchFailure(blockId, (Exception) blockValue); + } else { + fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue); } - return null; - } catch (Throwable e) { - e.printStackTrace(); - throw e; } + return null; + } catch (Throwable e) { + e.printStackTrace(); + throw e; } }; @@ -295,7 +291,7 @@ public class RetryingBlockFetcherSuite { } assertNotNull(stub); - stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject()); + stub.when(fetchStarter).createAndStart(any(), anyObject()); String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); } |