aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBinh Nguyen <ngbinh@gmail.com>2013-12-27 11:18:27 -0800
committerBinh Nguyen <ngbinh@gmail.com>2013-12-27 11:24:30 -0800
commit2c5bade4ee6db747cbc7b0884094ad443834e3b1 (patch)
tree4469e518f8e096720240e1b10e51c1791b8ef4d4 /core
parent786f393a98f8771d0c20322cd50e553a895c7d60 (diff)
downloadspark-2c5bade4ee6db747cbc7b0884094ad443834e3b1.tar.gz
spark-2c5bade4ee6db747cbc7b0884094ad443834e3b1.tar.bz2
spark-2c5bade4ee6db747cbc7b0884094ad443834e3b1.zip
Fix failed unit tests
Also clean up a bit.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java25
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java2
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java10
3 files changed, 24 insertions, 13 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 6b7f6a9397..46d61503bc 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -27,14 +27,17 @@ import io.netty.channel.socket.oio.OioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
class FileClient {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
- private FileClientHandler handler = null;
+ private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
private EventLoopGroup group = null;
- private int connectTimeout = 60*1000; // 1 min
+ private final int connectTimeout;
+ private final int sendTimeout = 60; // 1 min
public FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
@@ -43,7 +46,7 @@ class FileClient {
public void init() {
group = new OioEventLoopGroup();
- Bootstrap bootstrap = new Bootstrap();
+ bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
@@ -59,6 +62,7 @@ class FileClient {
// ChannelFuture cf = channel.closeFuture();
//cf.addListener(new ChannelCloseListener(this));
} catch (InterruptedException e) {
+ LOG.warn("FileClient interrupted while trying to connect", e);
close();
}
}
@@ -74,15 +78,18 @@ class FileClient {
public void sendRequest(String file) {
//assert(file == null);
//assert(channel == null);
- channel.write(file + "\r\n");
+ try {
+ // Should be able to send the message to network link channel.
+ boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
+ if (!bSent) {
+ throw new RuntimeException("Failed to send");
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error", e);
+ }
}
public void close() {
- if(channel != null) {
- channel.close().awaitUninterruptibly();
- channel = null;
- }
-
if (group != null) {
group.shutdownGracefully();
group = null;
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index f1f264c583..3f15ff898f 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -35,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
- .addLast("strDecoder", new StringDecoder())
+ .addLast("stringDecoder", new StringDecoder())
.addLast("handler", new FileServerHandler(pResolver));
}
}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index f3009b4605..e2d9391b4c 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -26,10 +26,14 @@ import io.netty.channel.DefaultFileRegion;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class FileServerHandler extends SimpleChannelInboundHandler<String> {
- PathResolver pResolver;
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
+ private final PathResolver pResolver;
public FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
@@ -63,7 +67,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
ctx.write(new DefaultFileRegion(new FileInputStream(file)
.getChannel(), fileSegment.offset(), fileSegment.length()));
} catch (Exception e) {
- e.printStackTrace();
+ LOG.error("Exception: ", e);
}
} else {
ctx.write(new FileHeader(0, blockId).buffer());
@@ -73,7 +77,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- cause.printStackTrace();
+ LOG.error("Exception: ", cause);
ctx.close();
}
}