aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /common/network-shuffle
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'common/network-shuffle')
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java8
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java7
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java21
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java9
4 files changed, 13 insertions, 32 deletions
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6e02430a8e..6daf9609d7 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -190,12 +190,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
- allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return blockManager.getRegisteredExecutorsSize();
- }
- });
+ allMetrics.put("registeredExecutorsSize",
+ (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
}
@Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 25e9abde70..62d58aba4c 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -205,12 +205,7 @@ public class ExternalShuffleBlockResolver {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
// Execute the actual deletion in a different thread, as it may take some time.
- directoryCleaner.execute(new Runnable() {
- @Override
- public void run() {
- deleteExecutorDirs(executor.localDirs);
- }
- });
+ directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
}
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 8c0c400966..2c5827bf7d 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -82,23 +82,19 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void fetchBlocks(
- final String host,
- final int port,
- final String execId,
+ String host,
+ int port,
+ String execId,
String[] blockIds,
BlockFetchingListener listener) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
- new RetryingBlockFetcher.BlockFetchStarter() {
- @Override
- public void createAndStart(String[] blockIds, BlockFetchingListener listener)
- throws IOException, InterruptedException {
+ (blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
- new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
- }
- };
+ new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
+ };
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
@@ -131,12 +127,9 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
- TransportClient client = clientFactory.createUnmanagedClient(host, port);
- try {
+ try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
- } finally {
- client.close();
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 5be855048e..f309dda8af 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -164,12 +164,9 @@ public class RetryingBlockFetcher {
logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
- fetchAllOutstanding();
- }
+ executorService.submit(() -> {
+ Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
+ fetchAllOutstanding();
});
}