aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorshane-huang <shengsheng.huang@intel.com>2013-02-20 11:51:13 +0800
committershane-huang <shengsheng.huang@intel.com>2013-04-07 14:37:12 +0800
commitdf47b40b764e25cbd10ce49d7152e1d33f51a263 (patch)
tree46b3efea5434f02a5ee87c0b31c3bb59c74b773c /core/src/main/java
parentdfe98ca798d84e6847330012f1332d9271156534 (diff)
downloadspark-df47b40b764e25cbd10ce49d7152e1d33f51a263.tar.gz
spark-df47b40b764e25cbd10ce49d7152e1d33f51a263.tar.bz2
spark-df47b40b764e25cbd10ce49d7152e1d33f51a263.zip
Shuffle Performance fix: Use netty embeded OIO file server instead of ConnectionManager
Shuffle Performance Optimization: do not send 0-byte block requests to reduce network messages change reference from io.Source to scala.io.Source to avoid looking into io.netty package Signed-off-by: shane-huang <shengsheng.huang@intel.com>
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/spark/network/netty/FileClient.java89
-rw-r--r--core/src/main/java/spark/network/netty/FileClientChannelInitializer.java29
-rw-r--r--core/src/main/java/spark/network/netty/FileClientHandler.java38
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java59
-rw-r--r--core/src/main/java/spark/network/netty/FileServerChannelInitializer.java33
-rw-r--r--core/src/main/java/spark/network/netty/FileServerHandler.java68
-rwxr-xr-xcore/src/main/java/spark/network/netty/PathResolver.java12
7 files changed, 328 insertions, 0 deletions
diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java
new file mode 100644
index 0000000000..d0c5081dd2
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/FileClient.java
@@ -0,0 +1,89 @@
+package spark.network.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.socket.oio.OioSocketChannel;
+
+import java.util.Arrays;
+
+public class FileClient {
+
+ private FileClientHandler handler = null;
+ private Channel channel = null;
+ private Bootstrap bootstrap = null;
+
+ public FileClient(FileClientHandler handler){
+ this.handler = handler;
+ }
+
+ public void init(){
+ bootstrap = new Bootstrap();
+ bootstrap.group(new OioEventLoopGroup())
+ .channel(OioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new FileClientChannelInitializer(handler));
+ }
+
+ public static final class ChannelCloseListener implements ChannelFutureListener {
+ private FileClient fc = null;
+ public ChannelCloseListener(FileClient fc){
+ this.fc = fc;
+ }
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (fc.bootstrap!=null){
+ fc.bootstrap.shutdown();
+ fc.bootstrap = null;
+ }
+ }
+ }
+
+ public void connect(String host, int port){
+ try {
+
+ // Start the connection attempt.
+ channel = bootstrap.connect(host, port).sync().channel();
+ // ChannelFuture cf = channel.closeFuture();
+ //cf.addListener(new ChannelCloseListener(this));
+ } catch (InterruptedException e) {
+ close();
+ }
+ }
+
+ public void waitForClose(){
+ try {
+ channel.closeFuture().sync();
+ } catch (InterruptedException e){
+ e.printStackTrace();
+ }
+ }
+
+ public void sendRequest(String file){
+ //assert(file == null);
+ //assert(channel == null);
+ channel.write(file+"\r\n");
+ }
+
+ public void close(){
+ if(channel != null) {
+ channel.close();
+ channel = null;
+ }
+ if ( bootstrap!=null) {
+ bootstrap.shutdown();
+ bootstrap = null;
+ }
+ }
+
+
+}
+
+
diff --git a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
new file mode 100644
index 0000000000..50e5704619
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
@@ -0,0 +1,29 @@
+package spark.network.netty;
+
+import io.netty.buffer.BufType;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.util.CharsetUtil;
+
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.logging.LogLevel;
+
+public class FileClientChannelInitializer extends
+ ChannelInitializer<SocketChannel> {
+
+ private FileClientHandler fhandler;
+
+ public FileClientChannelInitializer(FileClientHandler handler) {
+ fhandler = handler;
+ }
+
+ @Override
+ public void initChannel(SocketChannel channel) {
+ // file no more than 2G
+ channel.pipeline()
+ .addLast("encoder", new StringEncoder(BufType.BYTE))
+ .addLast("handler", fhandler);
+ }
+}
diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java
new file mode 100644
index 0000000000..911c8b32b5
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/FileClientHandler.java
@@ -0,0 +1,38 @@
+package spark.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundByteHandlerAdapter;
+import io.netty.util.CharsetUtil;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
+
+ private FileHeader currentHeader = null;
+
+ public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
+
+ @Override
+ public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
+ // Use direct buffer if possible.
+ return ctx.alloc().ioBuffer();
+ }
+
+ @Override
+ public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
+ // get header
+ if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
+ currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
+ }
+ // get file
+ if(in.readableBytes() >= currentHeader.fileLen()){
+ handle(ctx,in,currentHeader);
+ currentHeader = null;
+ ctx.close();
+ }
+ }
+
+}
+
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
new file mode 100644
index 0000000000..729e45f0a1
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -0,0 +1,59 @@
+package spark.network.netty;
+
+import java.io.File;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.Channel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.socket.oio.OioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+/**
+ * Server that accept the path of a file an echo back its content.
+ */
+public class FileServer {
+
+ private ServerBootstrap bootstrap = null;
+ private Channel channel = null;
+ private PathResolver pResolver;
+
+ public FileServer(PathResolver pResolver){
+ this.pResolver = pResolver;
+ }
+
+ public void run(int port) {
+ // Configure the server.
+ bootstrap = new ServerBootstrap();
+ try {
+ bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+ .channel(OioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .option(ChannelOption.SO_RCVBUF, 1500)
+ .childHandler(new FileServerChannelInitializer(pResolver));
+ // Start the server.
+ channel = bootstrap.bind(port).sync().channel();
+ channel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally{
+ bootstrap.shutdown();
+ }
+ }
+
+ public void stop(){
+ if (channel!=null){
+ channel.close();
+ }
+ if (bootstrap != null){
+ bootstrap.shutdown();
+ bootstrap = null;
+ }
+ }
+}
+
+
diff --git a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
new file mode 100644
index 0000000000..9d0618ff1c
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
@@ -0,0 +1,33 @@
+package spark.network.netty;
+
+import java.io.File;
+import io.netty.buffer.BufType;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.Delimiters;
+import io.netty.util.CharsetUtil;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.logging.LogLevel;
+
+public class FileServerChannelInitializer extends
+ ChannelInitializer<SocketChannel> {
+
+ PathResolver pResolver;
+
+ public FileServerChannelInitializer(PathResolver pResolver) {
+ this.pResolver = pResolver;
+ }
+
+ @Override
+ public void initChannel(SocketChannel channel) {
+ channel.pipeline()
+ .addLast("framer", new DelimiterBasedFrameDecoder(
+ 8192, Delimiters.lineDelimiter()))
+ .addLast("strDecoder", new StringDecoder())
+ .addLast("handler", new FileServerHandler(pResolver));
+
+ }
+}
diff --git a/core/src/main/java/spark/network/netty/FileServerHandler.java b/core/src/main/java/spark/network/netty/FileServerHandler.java
new file mode 100644
index 0000000000..e1083e87a2
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/FileServerHandler.java
@@ -0,0 +1,68 @@
+package spark.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.handler.stream.ChunkedFile;
+import java.io.File;
+import java.io.FileInputStream;
+
+public class FileServerHandler extends
+ ChannelInboundMessageHandlerAdapter<String> {
+
+ PathResolver pResolver;
+
+ public FileServerHandler(PathResolver pResolver){
+ this.pResolver = pResolver;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, String blockId) {
+ String path = pResolver.getAbsolutePath(blockId);
+ // if getFilePath returns null, close the channel
+ if (path == null) {
+ //ctx.close();
+ return;
+ }
+ File file = new File(path);
+ if (file.exists()) {
+ if (!file.isFile()) {
+ //logger.info("Not a file : " + file.getAbsolutePath());
+ ctx.write(new FileHeader(0, blockId).buffer());
+ ctx.flush();
+ return;
+ }
+ long length = file.length();
+ if (length > Integer.MAX_VALUE || length <= 0 ) {
+ //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
+ ctx.write(new FileHeader(0, blockId).buffer());
+ ctx.flush();
+ return;
+ }
+ int len = new Long(length).intValue();
+ //logger.info("Sending block "+blockId+" filelen = "+len);
+ //logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
+ ctx.write((new FileHeader(len, blockId)).buffer());
+ try {
+ ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
+ .getChannel(), 0, file.length()));
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ //logger.warning("Exception when sending file : "
+ //+ file.getAbsolutePath());
+ e.printStackTrace();
+ }
+ } else {
+ //logger.warning("File not found: " + file.getAbsolutePath());
+ ctx.write(new FileHeader(0, blockId).buffer());
+ }
+ ctx.flush();
+ }
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ ctx.close();
+ }
+}
diff --git a/core/src/main/java/spark/network/netty/PathResolver.java b/core/src/main/java/spark/network/netty/PathResolver.java
new file mode 100755
index 0000000000..5d5eda006e
--- /dev/null
+++ b/core/src/main/java/spark/network/netty/PathResolver.java
@@ -0,0 +1,12 @@
+package spark.network.netty;
+
+public interface PathResolver {
+ /**
+ * Get the absolute path of the file
+ *
+ * @param fileId
+ * @return the absolute path of file
+ */
+ public String getAbsolutePath(String fileId);
+
+}