aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-05-31 23:21:38 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-05-31 23:21:38 -0700
commit91aca9224936da84b16ea789cb81914579a0db03 (patch)
tree79319339baeb37a08c76ca32f426ed2b83f7238c /core/src/main/java
parent84530ba6d9fa47ee2863bb50c23742ecfa2a6a64 (diff)
downloadspark-91aca9224936da84b16ea789cb81914579a0db03.tar.gz
spark-91aca9224936da84b16ea789cb81914579a0db03.tar.bz2
spark-91aca9224936da84b16ea789cb81914579a0db03.zip
Another round of Netty fixes.
1. Avoid race condition between stop and copier completion 2. Handle socket exceptions by reporting them and filling in a failed FetchResult
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/spark/network/netty/FileClient.java24
-rw-r--r--core/src/main/java/spark/network/netty/FileClientHandler.java8
2 files changed, 14 insertions, 18 deletions
diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java
index 3a62dacbc8..9c9b976ebe 100644
--- a/core/src/main/java/spark/network/netty/FileClient.java
+++ b/core/src/main/java/spark/network/netty/FileClient.java
@@ -8,9 +8,12 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class FileClient {
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private FileClientHandler handler = null;
private Channel channel = null;
private Bootstrap bootstrap = null;
@@ -25,25 +28,10 @@ class FileClient {
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 0) // Disable connect timeout
.handler(new FileClientChannelInitializer(handler));
}
- public static final class ChannelCloseListener implements ChannelFutureListener {
- private FileClient fc = null;
-
- public ChannelCloseListener(FileClient fc){
- this.fc = fc;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) {
- if (fc.bootstrap!=null){
- fc.bootstrap.shutdown();
- fc.bootstrap = null;
- }
- }
- }
-
public void connect(String host, int port) {
try {
// Start the connection attempt.
@@ -58,8 +46,8 @@ class FileClient {
public void waitForClose() {
try {
channel.closeFuture().sync();
- } catch (InterruptedException e){
- e.printStackTrace();
+ } catch (InterruptedException e) {
+ LOG.warn("FileClient interrupted", e);
}
}
diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java
index 2069dee5ca..9fc9449827 100644
--- a/core/src/main/java/spark/network/netty/FileClientHandler.java
+++ b/core/src/main/java/spark/network/netty/FileClientHandler.java
@@ -9,7 +9,14 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
private FileHeader currentHeader = null;
+ private volatile boolean handlerCalled = false;
+
+ public boolean isComplete() {
+ return handlerCalled;
+ }
+
public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
+ public abstract void handleError(String blockId);
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
@@ -26,6 +33,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
// get file
if(in.readableBytes() >= currentHeader.fileLen()) {
handle(ctx, in, currentHeader);
+ handlerCalled = true;
currentHeader = null;
ctx.close();
}