From 9e64396ca4c24804d5fd4e96212eed54530ca409 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 7 May 2013 18:30:54 -0700 Subject: Cleaned up the Java files from Shane's PR. --- .../main/java/spark/network/netty/FileClient.java | 45 ++++++------- .../netty/FileClientChannelInitializer.java | 11 +--- .../spark/network/netty/FileClientHandler.java | 11 ++-- .../main/java/spark/network/netty/FileServer.java | 73 ++++++++++------------ .../netty/FileServerChannelInitializer.java | 22 +++---- .../spark/network/netty/FileServerHandler.java | 33 +++++----- .../java/spark/network/netty/PathResolver.java | 4 +- core/src/main/scala/spark/storage/DiskStore.scala | 4 +- 8 files changed, 85 insertions(+), 118 deletions(-) diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java index d0c5081dd2..3a62dacbc8 100644 --- a/core/src/main/java/spark/network/netty/FileClient.java +++ b/core/src/main/java/spark/network/netty/FileClient.java @@ -1,42 +1,40 @@ package spark.network.netty; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioSocketChannel; -import java.util.Arrays; -public class FileClient { +class FileClient { private FileClientHandler handler = null; private Channel channel = null; private Bootstrap bootstrap = null; - public FileClient(FileClientHandler handler){ + public FileClient(FileClientHandler handler) { this.handler = handler; } - - public void init(){ - bootstrap = new Bootstrap(); - bootstrap.group(new OioEventLoopGroup()) + + public void init() { + bootstrap = new Bootstrap(); + bootstrap.group(new OioEventLoopGroup()) .channel(OioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new FileClientChannelInitializer(handler)); - } + } public static final class ChannelCloseListener implements ChannelFutureListener { private FileClient fc = null; + public ChannelCloseListener(FileClient fc){ this.fc = fc; } + @Override public void operationComplete(ChannelFuture future) { if (fc.bootstrap!=null){ @@ -46,44 +44,39 @@ public class FileClient { } } - public void connect(String host, int port){ + public void connect(String host, int port) { try { - // Start the connection attempt. channel = bootstrap.connect(host, port).sync().channel(); // ChannelFuture cf = channel.closeFuture(); //cf.addListener(new ChannelCloseListener(this)); } catch (InterruptedException e) { close(); - } + } } - - public void waitForClose(){ + + public void waitForClose() { try { channel.closeFuture().sync(); } catch (InterruptedException e){ e.printStackTrace(); } - } + } - public void sendRequest(String file){ + public void sendRequest(String file) { //assert(file == null); //assert(channel == null); - channel.write(file+"\r\n"); + channel.write(file + "\r\n"); } - public void close(){ + public void close() { if(channel != null) { - channel.close(); - channel = null; + channel.close(); + channel = null; } if ( bootstrap!=null) { bootstrap.shutdown(); bootstrap = null; } } - - } - - diff --git a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java index 50e5704619..af25baf641 100644 --- a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java +++ b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java @@ -3,15 +3,10 @@ package spark.network.netty; import io.netty.buffer.BufType; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.string.StringEncoder; -import io.netty.util.CharsetUtil; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.logging.LogLevel; -public class FileClientChannelInitializer extends - ChannelInitializer { +class FileClientChannelInitializer extends ChannelInitializer { private FileClientHandler fhandler; @@ -23,7 +18,7 @@ public class FileClientChannelInitializer extends public void initChannel(SocketChannel channel) { // file no more than 2G channel.pipeline() - .addLast("encoder", new StringEncoder(BufType.BYTE)) - .addLast("handler", fhandler); + .addLast("encoder", new StringEncoder(BufType.BYTE)) + .addLast("handler", fhandler); } } diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java index 911c8b32b5..2069dee5ca 100644 --- a/core/src/main/java/spark/network/netty/FileClientHandler.java +++ b/core/src/main/java/spark/network/netty/FileClientHandler.java @@ -3,12 +3,9 @@ package spark.network.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; -import io.netty.util.CharsetUtil; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Logger; -public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { +abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { private FileHeader currentHeader = null; @@ -19,7 +16,7 @@ public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter // Use direct buffer if possible. return ctx.alloc().ioBuffer(); } - + @Override public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) { // get header @@ -27,8 +24,8 @@ public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE())); } // get file - if(in.readableBytes() >= currentHeader.fileLen()){ - handle(ctx,in,currentHeader); + if(in.readableBytes() >= currentHeader.fileLen()) { + handle(ctx, in, currentHeader); currentHeader = null; ctx.close(); } diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index 38af305096..647b26bf8a 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -1,58 +1,51 @@ package spark.network.netty; -import java.io.File; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.Channel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; + /** * Server that accept the path of a file an echo back its content. */ -public class FileServer { +class FileServer { + + private ServerBootstrap bootstrap = null; + private Channel channel = null; + private PathResolver pResolver; - private ServerBootstrap bootstrap = null; - private Channel channel = null; - private PathResolver pResolver; + public FileServer(PathResolver pResolver) { + this.pResolver = pResolver; + } - public FileServer(PathResolver pResolver){ - this.pResolver = pResolver; + public void run(int port) { + // Configure the server. + bootstrap = new ServerBootstrap(); + try { + bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup()) + .channel(OioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .option(ChannelOption.SO_RCVBUF, 1500) + .childHandler(new FileServerChannelInitializer(pResolver)); + // Start the server. + channel = bootstrap.bind(port).sync().channel(); + channel.closeFuture().sync(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally{ + bootstrap.shutdown(); } + } - public void run(int port) { - // Configure the server. - bootstrap = new ServerBootstrap(); - try { - bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup()) - .channel(OioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 100) - .option(ChannelOption.SO_RCVBUF, 1500) - .childHandler(new FileServerChannelInitializer(pResolver)); - // Start the server. - channel = bootstrap.bind(port).sync().channel(); - channel.closeFuture().sync(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally{ - bootstrap.shutdown(); - } + public void stop() { + if (channel!=null) { + channel.close(); } - - public void stop(){ - if (channel!=null){ - channel.close(); - } - if (bootstrap != null){ - bootstrap.shutdown(); - } + if (bootstrap != null) { + bootstrap.shutdown(); } + } } - - diff --git a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java index 9d0618ff1c..8f1f5c65cd 100644 --- a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java @@ -1,21 +1,15 @@ package spark.network.netty; -import java.io.File; -import io.netty.buffer.BufType; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; -import io.netty.util.CharsetUtil; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.logging.LogLevel; +import io.netty.handler.codec.string.StringDecoder; + -public class FileServerChannelInitializer extends - ChannelInitializer { +class FileServerChannelInitializer extends ChannelInitializer { - PathResolver pResolver; + PathResolver pResolver; public FileServerChannelInitializer(PathResolver pResolver) { this.pResolver = pResolver; @@ -24,10 +18,8 @@ public class FileServerChannelInitializer extends @Override public void initChannel(SocketChannel channel) { channel.pipeline() - .addLast("framer", new DelimiterBasedFrameDecoder( - 8192, Delimiters.lineDelimiter())) - .addLast("strDecoder", new StringDecoder()) - .addLast("handler", new FileServerHandler(pResolver)); - + .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())) + .addLast("strDecoder", new StringDecoder()) + .addLast("handler", new FileServerHandler(pResolver)); } } diff --git a/core/src/main/java/spark/network/netty/FileServerHandler.java b/core/src/main/java/spark/network/netty/FileServerHandler.java index e1083e87a2..a78eddb1b5 100644 --- a/core/src/main/java/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/spark/network/netty/FileServerHandler.java @@ -1,17 +1,17 @@ package spark.network.netty; +import java.io.File; +import java.io.FileInputStream; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.DefaultFileRegion; -import io.netty.handler.stream.ChunkedFile; -import java.io.File; -import java.io.FileInputStream; -public class FileServerHandler extends - ChannelInboundMessageHandlerAdapter { - PathResolver pResolver; - +class FileServerHandler extends ChannelInboundMessageHandlerAdapter { + + PathResolver pResolver; + public FileServerHandler(PathResolver pResolver){ this.pResolver = pResolver; } @@ -21,8 +21,8 @@ public class FileServerHandler extends String path = pResolver.getAbsolutePath(blockId); // if getFilePath returns null, close the channel if (path == null) { - //ctx.close(); - return; + //ctx.close(); + return; } File file = new File(path); if (file.exists()) { @@ -33,23 +33,21 @@ public class FileServerHandler extends return; } long length = file.length(); - if (length > Integer.MAX_VALUE || length <= 0 ) { + if (length > Integer.MAX_VALUE || length <= 0) { //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); - return; + return; } int len = new Long(length).intValue(); //logger.info("Sending block "+blockId+" filelen = "+len); //logger.info("header = "+ (new FileHeader(len, blockId)).buffer()); ctx.write((new FileHeader(len, blockId)).buffer()); try { - ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) - .getChannel(), 0, file.length())); + ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) + .getChannel(), 0, file.length())); } catch (Exception e) { - // TODO Auto-generated catch block - //logger.warning("Exception when sending file : " - //+ file.getAbsolutePath()); + //logger.warning("Exception when sending file : " + file.getAbsolutePath()); e.printStackTrace(); } } else { @@ -58,8 +56,7 @@ public class FileServerHandler extends } ctx.flush(); } - - + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); diff --git a/core/src/main/java/spark/network/netty/PathResolver.java b/core/src/main/java/spark/network/netty/PathResolver.java index 5d5eda006e..302411672c 100755 --- a/core/src/main/java/spark/network/netty/PathResolver.java +++ b/core/src/main/java/spark/network/netty/PathResolver.java @@ -1,12 +1,12 @@ package spark.network.netty; + public interface PathResolver { /** * Get the absolute path of the file - * + * * @param fileId * @return the absolute path of file */ public String getAbsolutePath(String fileId); - } diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 82bcbd5bc2..be33d4260e 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -288,7 +288,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt val pResolver = new PathResolver { - def getAbsolutePath(blockId:String):String = { + override def getAbsolutePath(blockId: String): String = { if (!blockId.startsWith("shuffle_")) { return null } @@ -298,7 +298,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) shuffleSender = new Thread { override def run() = { val sender = new ShuffleSender(port,pResolver) - logInfo("created ShuffleSender binding to port : "+ port) + logInfo("Created ShuffleSender binding to port : "+ port) sender.start } } -- cgit v1.2.3