aboutsummaryrefslogtreecommitdiff
path: root/network/common
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-23 13:51:43 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-23 13:51:43 -0800
commit7cfa4c6bc36d97e459d4adee7b03d537d63c337e (patch)
tree93c9db79e4857ade4d542515e6050a39c132bdc3 /network/common
parent242be7daed9b01d19794bb2cf1ac421fe5ab7262 (diff)
downloadspark-7cfa4c6bc36d97e459d4adee7b03d537d63c337e.tar.gz
spark-7cfa4c6bc36d97e459d4adee7b03d537d63c337e.tar.bz2
spark-7cfa4c6bc36d97e459d4adee7b03d537d63c337e.zip
[SPARK-11865][NETWORK] Avoid returning inactive client in TransportClientFactory.
There's a very narrow race here where it would be possible for the timeout handler to close a channel after the client factory verified that the channel was still active. This change makes sure the client is marked as being recently in use so that the timeout handler does not close it until a new timeout cycle elapses. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9853 from vanzin/SPARK-11865.
Diffstat (limited to 'network/common')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClient.java9
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java15
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java9
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java36
4 files changed, 52 insertions, 17 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index a0ba223e34..876fcd8467 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -73,10 +73,12 @@ public class TransportClient implements Closeable {
private final Channel channel;
private final TransportResponseHandler handler;
@Nullable private String clientId;
+ private volatile boolean timedOut;
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
+ this.timedOut = false;
}
public Channel getChannel() {
@@ -84,7 +86,7 @@ public class TransportClient implements Closeable {
}
public boolean isActive() {
- return channel.isOpen() || channel.isActive();
+ return !timedOut && (channel.isOpen() || channel.isActive());
}
public SocketAddress getSocketAddress() {
@@ -263,6 +265,11 @@ public class TransportClient implements Closeable {
}
}
+ /** Mark this channel as having timed out. */
+ public void timeOut() {
+ this.timedOut = true;
+ }
+
@Override
public void close() {
// close is a local operation and should finish with milliseconds; timeout just to be safe
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 42a4f664e6..659c47160c 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -136,8 +136,19 @@ public class TransportClientFactory implements Closeable {
TransportClient cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null && cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", address, cachedClient);
- return cachedClient;
+ // Make sure that the channel will not timeout by updating the last use time of the
+ // handler. Then check that the client is still alive, in case it timed out before
+ // this code was able to update things.
+ TransportChannelHandler handler = cachedClient.getChannel().pipeline()
+ .get(TransportChannelHandler.class);
+ synchronized (handler) {
+ handler.getResponseHandler().updateTimeOfLastRequest();
+ }
+
+ if (cachedClient.isActive()) {
+ logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ return cachedClient;
+ }
}
// If we reach here, we don't have an existing connection open. Let's create a new one.
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index cc88991b58..be181e0660 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -71,7 +71,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
- timeOfLastRequestNs.set(System.nanoTime());
+ updateTimeOfLastRequest();
outstandingFetches.put(streamChunkId, callback);
}
@@ -80,7 +80,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
public void addRpcRequest(long requestId, RpcResponseCallback callback) {
- timeOfLastRequestNs.set(System.nanoTime());
+ updateTimeOfLastRequest();
outstandingRpcs.put(requestId, callback);
}
@@ -227,4 +227,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
return timeOfLastRequestNs.get();
}
+ /** Updates the time of the last request to the current system time. */
+ public void updateTimeOfLastRequest() {
+ timeOfLastRequestNs.set(System.nanoTime());
+ }
+
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index f8fcd1c3d7..29d688a675 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -116,20 +116,32 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
// there are outstanding requests, we also do a secondary consistency check to ensure
// there's no race between the idle timeout and incrementing the numOutstandingRequests
// (see SPARK-7003).
- boolean isActuallyOverdue =
- System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
- if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
- if (responseHandler.numOutstandingRequests() > 0) {
- String address = NettyUtils.getRemoteAddress(ctx.channel());
- logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
- "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
- "is wrong.", address, requestTimeoutNs / 1000 / 1000);
- ctx.close();
- } else if (closeIdleConnections) {
- // While CloseIdleConnections is enable, we also close idle connection
- ctx.close();
+ //
+ // To avoid a race between TransportClientFactory.createClient() and this code which could
+ // result in an inactive client being returned, this needs to run in a synchronized block.
+ synchronized (this) {
+ boolean isActuallyOverdue =
+ System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
+ if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
+ if (responseHandler.numOutstandingRequests() > 0) {
+ String address = NettyUtils.getRemoteAddress(ctx.channel());
+ logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
+ "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
+ "is wrong.", address, requestTimeoutNs / 1000 / 1000);
+ client.timeOut();
+ ctx.close();
+ } else if (closeIdleConnections) {
+ // While CloseIdleConnections is enable, we also close idle connection
+ client.timeOut();
+ ctx.close();
+ }
}
}
}
}
+
+ public TransportResponseHandler getResponseHandler() {
+ return responseHandler;
+ }
+
}