From 2c5bade4ee6db747cbc7b0884094ad443834e3b1 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Fri, 27 Dec 2013 11:18:27 -0800 Subject: Fix failed unit tests Also clean up a bit. --- .../org/apache/spark/network/netty/FileClient.java | 25 ++++++++++++++-------- .../netty/FileServerChannelInitializer.java | 2 +- .../spark/network/netty/FileServerHandler.java | 10 ++++++--- 3 files changed, 24 insertions(+), 13 deletions(-) (limited to 'core/src/main/java') diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index 6b7f6a9397..46d61503bc 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -27,14 +27,17 @@ import io.netty.channel.socket.oio.OioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + class FileClient { private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); - private FileClientHandler handler = null; + private final FileClientHandler handler; private Channel channel = null; private Bootstrap bootstrap = null; private EventLoopGroup group = null; - private int connectTimeout = 60*1000; // 1 min + private final int connectTimeout; + private final int sendTimeout = 60; // 1 min public FileClient(FileClientHandler handler, int connectTimeout) { this.handler = handler; @@ -43,7 +46,7 @@ class FileClient { public void init() { group = new OioEventLoopGroup(); - Bootstrap bootstrap = new Bootstrap(); + bootstrap = new Bootstrap(); bootstrap.group(group) .channel(OioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) @@ -59,6 +62,7 @@ class FileClient { // ChannelFuture cf = channel.closeFuture(); //cf.addListener(new ChannelCloseListener(this)); } catch (InterruptedException e) { + LOG.warn("FileClient interrupted while trying to connect", e); close(); } } @@ -74,15 +78,18 @@ class FileClient { public void sendRequest(String file) { //assert(file == null); //assert(channel == null); - channel.write(file + "\r\n"); + try { + // Should be able to send the message to network link channel. + boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS); + if (!bSent) { + throw new RuntimeException("Failed to send"); + } + } catch (InterruptedException e) { + LOG.error("Error", e); + } } public void close() { - if(channel != null) { - channel.close().awaitUninterruptibly(); - channel = null; - } - if (group != null) { group.shutdownGracefully(); group = null; diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java index f1f264c583..3f15ff898f 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java @@ -35,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer { public void initChannel(SocketChannel channel) { channel.pipeline() .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())) - .addLast("strDecoder", new StringDecoder()) + .addLast("stringDecoder", new StringDecoder()) .addLast("handler", new FileServerHandler(pResolver)); } } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index f3009b4605..e2d9391b4c 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -26,10 +26,14 @@ import io.netty.channel.DefaultFileRegion; import org.apache.spark.storage.BlockId; import org.apache.spark.storage.FileSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class FileServerHandler extends SimpleChannelInboundHandler { - PathResolver pResolver; + private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + + private final PathResolver pResolver; public FileServerHandler(PathResolver pResolver){ this.pResolver = pResolver; @@ -63,7 +67,7 @@ class FileServerHandler extends SimpleChannelInboundHandler { ctx.write(new DefaultFileRegion(new FileInputStream(file) .getChannel(), fileSegment.offset(), fileSegment.length())); } catch (Exception e) { - e.printStackTrace(); + LOG.error("Exception: ", e); } } else { ctx.write(new FileHeader(0, blockId).buffer()); @@ -73,7 +77,7 @@ class FileServerHandler extends SimpleChannelInboundHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + LOG.error("Exception: ", cause); ctx.close(); } } -- cgit v1.2.3