aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2017-02-13 11:04:27 -0800
committerCheng Lian <lian@databricks.com>2017-02-13 11:04:27 -0800
commit1c4d10b10c78d138b55e381ec6828e04fef70d6f (patch)
treea10bccbc06fcf48b08a4c274363252db135e8b0c /common
parentab88b2410623e5fdb06d558017bd6d50220e466a (diff)
downloadspark-1c4d10b10c78d138b55e381ec6828e04fef70d6f.tar.gz
spark-1c4d10b10c78d138b55e381ec6828e04fef70d6f.tar.bz2
spark-1c4d10b10c78d138b55e381ec6828e04fef70d6f.zip
[SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
## What changes were proposed in this pull request? This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. ## How was this patch tested? Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes #16866 from JoshRosen/SPARK-19529.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java10
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java6
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java4
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java3
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java2
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java4
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java7
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java16
9 files changed, 30 insertions, 24 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index cb10edff65..b50e043d5c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -122,7 +122,8 @@ public class TransportClientFactory implements Closeable {
*
* Concurrency: This method is safe to call from multiple threads.
*/
- public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
+ public TransportClient createClient(String remoteHost, int remotePort)
+ throws IOException, InterruptedException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
// Use unresolved address here to avoid DNS resolution each time we creates a client.
@@ -190,13 +191,14 @@ public class TransportClientFactory implements Closeable {
* As with {@link #createClient(String, int)}, this method is blocking.
*/
public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
- throws IOException {
+ throws IOException, InterruptedException {
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
return createClient(address);
}
/** Create a completely new {@link TransportClient} to the remote address. */
- private TransportClient createClient(InetSocketAddress address) throws IOException {
+ private TransportClient createClient(InetSocketAddress address)
+ throws IOException, InterruptedException {
logger.debug("Creating new connection to {}", address);
Bootstrap bootstrap = new Bootstrap();
@@ -223,7 +225,7 @@ public class TransportClientFactory implements Closeable {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
- if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
+ if (!cf.await(conf.connectionTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
} else if (cf.cause() != null) {
diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index f54a64cb0f..205ab88c84 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -99,6 +99,8 @@ public class TransportClientFactorySuite {
clients.add(client);
} catch (IOException e) {
failed.incrementAndGet();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
};
@@ -142,7 +144,7 @@ public class TransportClientFactorySuite {
}
@Test
- public void returnDifferentClientsForDifferentServers() throws IOException {
+ public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
@@ -171,7 +173,7 @@ public class TransportClientFactorySuite {
}
@Test
- public void closeBlockClientsWithFactory() throws IOException {
+ public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 616505d979..8c0c400966 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -94,7 +94,7 @@ public class ExternalShuffleClient extends ShuffleClient {
new RetryingBlockFetcher.BlockFetchStarter() {
@Override
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
- throws IOException {
+ throws IOException, InterruptedException {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
}
@@ -129,7 +129,7 @@ public class ExternalShuffleClient extends ShuffleClient {
String host,
int port,
String execId,
- ExecutorShuffleInfo executorInfo) throws IOException {
+ ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
TransportClient client = clientFactory.createUnmanagedClient(host, port);
try {
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 72bd0f803d..5be855048e 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -57,7 +57,8 @@ public class RetryingBlockFetcher {
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
* issues.
*/
- void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
+ void createAndStart(String[] blockIds, BlockFetchingListener listener)
+ throws IOException, InterruptedException;
}
/** Shared executor service used for waiting and retrying. */
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index ab49b1c1d7..dbc1010847 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -68,7 +68,7 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient {
String host,
int port,
long heartbeatTimeoutMs,
- long heartbeatIntervalMs) throws IOException {
+ long heartbeatIntervalMs) throws IOException, InterruptedException {
checkInit();
ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 298a487ebb..52f50a3409 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -103,7 +103,7 @@ public class SaslIntegrationSuite {
}
@Test
- public void testGoodClient() throws IOException {
+ public void testGoodClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
Lists.<TransportClientBootstrap>newArrayList(
new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
@@ -133,7 +133,7 @@ public class SaslIntegrationSuite {
}
@Test
- public void testNoSaslClient() throws IOException {
+ public void testNoSaslClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
Lists.<TransportClientBootstrap>newArrayList());
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 9248ef3c46..88de6fb83c 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -242,7 +242,7 @@ public class ExternalShuffleIntegrationSuite {
}
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
- throws IOException {
+ throws IOException, InterruptedException {
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
client.init(APP_ID);
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 4ae75a1b17..bf20c577ed 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -60,7 +60,7 @@ public class ExternalShuffleSecuritySuite {
}
@Test
- public void testValid() throws IOException {
+ public void testValid() throws IOException, InterruptedException {
validate("my-app-id", "secret", false);
}
@@ -83,12 +83,13 @@ public class ExternalShuffleSecuritySuite {
}
@Test
- public void testEncryption() throws IOException {
+ public void testEncryption() throws IOException, InterruptedException {
validate("my-app-id", "secret", true);
}
/** Creates an ExternalShuffleClient and attempts to register with the server. */
- private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
+ private void validate(String appId, String secretKey, boolean encrypt)
+ throws IOException, InterruptedException {
TransportConf testConf = conf;
if (encrypt) {
testConf = new TransportConf("shuffle", new MapConfigProvider(
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index a2509f5f34..6db71eea6e 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -52,7 +52,7 @@ public class RetryingBlockFetcherSuite {
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
@Test
- public void testNoFailures() throws IOException {
+ public void testNoFailures() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -71,7 +71,7 @@ public class RetryingBlockFetcherSuite {
}
@Test
- public void testUnrecoverableFailure() throws IOException {
+ public void testUnrecoverableFailure() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -90,7 +90,7 @@ public class RetryingBlockFetcherSuite {
}
@Test
- public void testSingleIOExceptionOnFirst() throws IOException {
+ public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -113,7 +113,7 @@ public class RetryingBlockFetcherSuite {
}
@Test
- public void testSingleIOExceptionOnSecond() throws IOException {
+ public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -135,7 +135,7 @@ public class RetryingBlockFetcherSuite {
}
@Test
- public void testTwoIOExceptions() throws IOException {
+ public void testTwoIOExceptions() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -163,7 +163,7 @@ public class RetryingBlockFetcherSuite {
}
@Test
- public void testThreeIOExceptions() throws IOException {
+ public void testThreeIOExceptions() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -195,7 +195,7 @@ public class RetryingBlockFetcherSuite {
}
@Test
- public void testRetryAndUnrecoverable() throws IOException {
+ public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -238,7 +238,7 @@ public class RetryingBlockFetcherSuite {
@SuppressWarnings("unchecked")
private static void performInteractions(List<? extends Map<String, Object>> interactions,
BlockFetchingListener listener)
- throws IOException {
+ throws IOException, InterruptedException {
MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
"spark.shuffle.io.maxRetries", "2",