aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/network/netty/FileClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/network/netty/FileClient.java')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java32
1 files changed, 22 insertions, 10 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index edd0fc56f8..46d61503bc 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -20,19 +20,24 @@ package org.apache.spark.network.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
class FileClient {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
- private FileClientHandler handler = null;
+ private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
- private int connectTimeout = 60*1000; // 1 min
+ private EventLoopGroup group = null;
+ private final int connectTimeout;
+ private final int sendTimeout = 60; // 1 min
public FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
@@ -40,8 +45,9 @@ class FileClient {
}
public void init() {
+ group = new OioEventLoopGroup();
bootstrap = new Bootstrap();
- bootstrap.group(new OioEventLoopGroup())
+ bootstrap.group(group)
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
@@ -56,6 +62,7 @@ class FileClient {
// ChannelFuture cf = channel.closeFuture();
//cf.addListener(new ChannelCloseListener(this));
} catch (InterruptedException e) {
+ LOG.warn("FileClient interrupted while trying to connect", e);
close();
}
}
@@ -71,16 +78,21 @@ class FileClient {
public void sendRequest(String file) {
//assert(file == null);
//assert(channel == null);
- channel.write(file + "\r\n");
+ try {
+ // Should be able to send the message to network link channel.
+ boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
+ if (!bSent) {
+ throw new RuntimeException("Failed to send");
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error", e);
+ }
}
public void close() {
- if(channel != null) {
- channel.close();
- channel = null;
- }
- if ( bootstrap!=null) {
- bootstrap.shutdown();
+ if (group != null) {
+ group.shutdownGracefully();
+ group = null;
bootstrap = null;
}
}