aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClient.java9
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java15
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java9
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java36
4 files changed, 52 insertions, 17 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index a0ba223e34..876fcd8467 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -73,10 +73,12 @@ public class TransportClient implements Closeable {
private final Channel channel;
private final TransportResponseHandler handler;
@Nullable private String clientId;
+ private volatile boolean timedOut;
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
+ this.timedOut = false;
}
public Channel getChannel() {
@@ -84,7 +86,7 @@ public class TransportClient implements Closeable {
}
public boolean isActive() {
- return channel.isOpen() || channel.isActive();
+ return !timedOut && (channel.isOpen() || channel.isActive());
}
public SocketAddress getSocketAddress() {
@@ -263,6 +265,11 @@ public class TransportClient implements Closeable {
}
}
+ /** Mark this channel as having timed out. */
+ public void timeOut() {
+ this.timedOut = true;
+ }
+
@Override
public void close() {
// close is a local operation and should finish with milliseconds; timeout just to be safe
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 42a4f664e6..659c47160c 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -136,8 +136,19 @@ public class TransportClientFactory implements Closeable {
TransportClient cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null && cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", address, cachedClient);
- return cachedClient;
+ // Make sure that the channel will not timeout by updating the last use time of the
+ // handler. Then check that the client is still alive, in case it timed out before
+ // this code was able to update things.
+ TransportChannelHandler handler = cachedClient.getChannel().pipeline()
+ .get(TransportChannelHandler.class);
+ synchronized (handler) {
+ handler.getResponseHandler().updateTimeOfLastRequest();
+ }
+
+ if (cachedClient.isActive()) {
+ logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ return cachedClient;
+ }
}
// If we reach here, we don't have an existing connection open. Let's create a new one.
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index cc88991b58..be181e0660 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -71,7 +71,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
- timeOfLastRequestNs.set(System.nanoTime());
+ updateTimeOfLastRequest();
outstandingFetches.put(streamChunkId, callback);
}
@@ -80,7 +80,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
public void addRpcRequest(long requestId, RpcResponseCallback callback) {
- timeOfLastRequestNs.set(System.nanoTime());
+ updateTimeOfLastRequest();
outstandingRpcs.put(requestId, callback);
}
@@ -227,4 +227,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
return timeOfLastRequestNs.get();
}
+ /** Updates the time of the last request to the current system time. */
+ public void updateTimeOfLastRequest() {
+ timeOfLastRequestNs.set(System.nanoTime());
+ }
+
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index f8fcd1c3d7..29d688a675 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -116,20 +116,32 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
// there are outstanding requests, we also do a secondary consistency check to ensure
// there's no race between the idle timeout and incrementing the numOutstandingRequests
// (see SPARK-7003).
- boolean isActuallyOverdue =
- System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
- if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
- if (responseHandler.numOutstandingRequests() > 0) {
- String address = NettyUtils.getRemoteAddress(ctx.channel());
- logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
- "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
- "is wrong.", address, requestTimeoutNs / 1000 / 1000);
- ctx.close();
- } else if (closeIdleConnections) {
- // While CloseIdleConnections is enable, we also close idle connection
- ctx.close();
+ //
+ // To avoid a race between TransportClientFactory.createClient() and this code which could
+ // result in an inactive client being returned, this needs to run in a synchronized block.
+ synchronized (this) {
+ boolean isActuallyOverdue =
+ System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
+ if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
+ if (responseHandler.numOutstandingRequests() > 0) {
+ String address = NettyUtils.getRemoteAddress(ctx.channel());
+ logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
+ "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
+ "is wrong.", address, requestTimeoutNs / 1000 / 1000);
+ client.timeOut();
+ ctx.close();
+ } else if (closeIdleConnections) {
+ // While CloseIdleConnections is enable, we also close idle connection
+ client.timeOut();
+ ctx.close();
+ }
}
}
}
}
+
+ public TransportResponseHandler getResponseHandler() {
+ return responseHandler;
+ }
+
}