aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-07 18:30:54 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-07 18:30:54 -0700
commit9e64396ca4c24804d5fd4e96212eed54530ca409 (patch)
tree083fdfd98dac20a764e9f01ba7fd551616ee5467
parent0e5cc30868bcf933f2980c4cfe29abc3d8fe5887 (diff)
downloadspark-9e64396ca4c24804d5fd4e96212eed54530ca409.tar.gz
spark-9e64396ca4c24804d5fd4e96212eed54530ca409.tar.bz2
spark-9e64396ca4c24804d5fd4e96212eed54530ca409.zip
Cleaned up the Java files from Shane's PR.
-rw-r--r--core/src/main/java/spark/network/netty/FileClient.java45
-rw-r--r--core/src/main/java/spark/network/netty/FileClientChannelInitializer.java11
-rw-r--r--core/src/main/java/spark/network/netty/FileClientHandler.java11
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java73
-rw-r--r--core/src/main/java/spark/network/netty/FileServerChannelInitializer.java22
-rw-r--r--core/src/main/java/spark/network/netty/FileServerHandler.java33
-rwxr-xr-xcore/src/main/java/spark/network/netty/PathResolver.java4
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala4
8 files changed, 85 insertions, 118 deletions
diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java
index d0c5081dd2..3a62dacbc8 100644
--- a/core/src/main/java/spark/network/netty/FileClient.java
+++ b/core/src/main/java/spark/network/netty/FileClient.java
@@ -1,42 +1,40 @@
package spark.network.netty;
import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
-import java.util.Arrays;
-public class FileClient {
+class FileClient {
private FileClientHandler handler = null;
private Channel channel = null;
private Bootstrap bootstrap = null;
- public FileClient(FileClientHandler handler){
+ public FileClient(FileClientHandler handler) {
this.handler = handler;
}
-
- public void init(){
- bootstrap = new Bootstrap();
- bootstrap.group(new OioEventLoopGroup())
+
+ public void init() {
+ bootstrap = new Bootstrap();
+ bootstrap.group(new OioEventLoopGroup())
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new FileClientChannelInitializer(handler));
- }
+ }
public static final class ChannelCloseListener implements ChannelFutureListener {
private FileClient fc = null;
+
public ChannelCloseListener(FileClient fc){
this.fc = fc;
}
+
@Override
public void operationComplete(ChannelFuture future) {
if (fc.bootstrap!=null){
@@ -46,44 +44,39 @@ public class FileClient {
}
}
- public void connect(String host, int port){
+ public void connect(String host, int port) {
try {
-
// Start the connection attempt.
channel = bootstrap.connect(host, port).sync().channel();
// ChannelFuture cf = channel.closeFuture();
//cf.addListener(new ChannelCloseListener(this));
} catch (InterruptedException e) {
close();
- }
+ }
}
-
- public void waitForClose(){
+
+ public void waitForClose() {
try {
channel.closeFuture().sync();
} catch (InterruptedException e){
e.printStackTrace();
}
- }
+ }
- public void sendRequest(String file){
+ public void sendRequest(String file) {
//assert(file == null);
//assert(channel == null);
- channel.write(file+"\r\n");
+ channel.write(file + "\r\n");
}
- public void close(){
+ public void close() {
if(channel != null) {
- channel.close();
- channel = null;
+ channel.close();
+ channel = null;
}
if ( bootstrap!=null) {
bootstrap.shutdown();
bootstrap = null;
}
}
-
-
}
-
-
diff --git a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
index 50e5704619..af25baf641 100644
--- a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
@@ -3,15 +3,10 @@ package spark.network.netty;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringEncoder;
-import io.netty.util.CharsetUtil;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.logging.LogLevel;
-public class FileClientChannelInitializer extends
- ChannelInitializer<SocketChannel> {
+class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private FileClientHandler fhandler;
@@ -23,7 +18,7 @@ public class FileClientChannelInitializer extends
public void initChannel(SocketChannel channel) {
// file no more than 2G
channel.pipeline()
- .addLast("encoder", new StringEncoder(BufType.BYTE))
- .addLast("handler", fhandler);
+ .addLast("encoder", new StringEncoder(BufType.BYTE))
+ .addLast("handler", fhandler);
}
}
diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java
index 911c8b32b5..2069dee5ca 100644
--- a/core/src/main/java/spark/network/netty/FileClientHandler.java
+++ b/core/src/main/java/spark/network/netty/FileClientHandler.java
@@ -3,12 +3,9 @@ package spark.network.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
-import io.netty.util.CharsetUtil;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Logger;
-public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
+abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
private FileHeader currentHeader = null;
@@ -19,7 +16,7 @@ public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter
// Use direct buffer if possible.
return ctx.alloc().ioBuffer();
}
-
+
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
// get header
@@ -27,8 +24,8 @@ public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
}
// get file
- if(in.readableBytes() >= currentHeader.fileLen()){
- handle(ctx,in,currentHeader);
+ if(in.readableBytes() >= currentHeader.fileLen()) {
+ handle(ctx, in, currentHeader);
currentHeader = null;
ctx.close();
}
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index 38af305096..647b26bf8a 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -1,58 +1,51 @@
package spark.network.netty;
-import java.io.File;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.Channel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
+
/**
* Server that accept the path of a file an echo back its content.
*/
-public class FileServer {
+class FileServer {
+
+ private ServerBootstrap bootstrap = null;
+ private Channel channel = null;
+ private PathResolver pResolver;
- private ServerBootstrap bootstrap = null;
- private Channel channel = null;
- private PathResolver pResolver;
+ public FileServer(PathResolver pResolver) {
+ this.pResolver = pResolver;
+ }
- public FileServer(PathResolver pResolver){
- this.pResolver = pResolver;
+ public void run(int port) {
+ // Configure the server.
+ bootstrap = new ServerBootstrap();
+ try {
+ bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+ .channel(OioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .option(ChannelOption.SO_RCVBUF, 1500)
+ .childHandler(new FileServerChannelInitializer(pResolver));
+ // Start the server.
+ channel = bootstrap.bind(port).sync().channel();
+ channel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally{
+ bootstrap.shutdown();
}
+ }
- public void run(int port) {
- // Configure the server.
- bootstrap = new ServerBootstrap();
- try {
- bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
- .channel(OioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 100)
- .option(ChannelOption.SO_RCVBUF, 1500)
- .childHandler(new FileServerChannelInitializer(pResolver));
- // Start the server.
- channel = bootstrap.bind(port).sync().channel();
- channel.closeFuture().sync();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally{
- bootstrap.shutdown();
- }
+ public void stop() {
+ if (channel!=null) {
+ channel.close();
}
-
- public void stop(){
- if (channel!=null){
- channel.close();
- }
- if (bootstrap != null){
- bootstrap.shutdown();
- }
+ if (bootstrap != null) {
+ bootstrap.shutdown();
}
+ }
}
-
-
diff --git a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
index 9d0618ff1c..8f1f5c65cd 100644
--- a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
@@ -1,21 +1,15 @@
package spark.network.netty;
-import java.io.File;
-import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
-import io.netty.util.CharsetUtil;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.logging.LogLevel;
+import io.netty.handler.codec.string.StringDecoder;
+
-public class FileServerChannelInitializer extends
- ChannelInitializer<SocketChannel> {
+class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
- PathResolver pResolver;
+ PathResolver pResolver;
public FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
@@ -24,10 +18,8 @@ public class FileServerChannelInitializer extends
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
- .addLast("framer", new DelimiterBasedFrameDecoder(
- 8192, Delimiters.lineDelimiter()))
- .addLast("strDecoder", new StringDecoder())
- .addLast("handler", new FileServerHandler(pResolver));
-
+ .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
+ .addLast("strDecoder", new StringDecoder())
+ .addLast("handler", new FileServerHandler(pResolver));
}
}
diff --git a/core/src/main/java/spark/network/netty/FileServerHandler.java b/core/src/main/java/spark/network/netty/FileServerHandler.java
index e1083e87a2..a78eddb1b5 100644
--- a/core/src/main/java/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/spark/network/netty/FileServerHandler.java
@@ -1,17 +1,17 @@
package spark.network.netty;
+import java.io.File;
+import java.io.FileInputStream;
+
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
-import io.netty.handler.stream.ChunkedFile;
-import java.io.File;
-import java.io.FileInputStream;
-public class FileServerHandler extends
- ChannelInboundMessageHandlerAdapter<String> {
- PathResolver pResolver;
-
+class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
+
+ PathResolver pResolver;
+
public FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
@@ -21,8 +21,8 @@ public class FileServerHandler extends
String path = pResolver.getAbsolutePath(blockId);
// if getFilePath returns null, close the channel
if (path == null) {
- //ctx.close();
- return;
+ //ctx.close();
+ return;
}
File file = new File(path);
if (file.exists()) {
@@ -33,23 +33,21 @@ public class FileServerHandler extends
return;
}
long length = file.length();
- if (length > Integer.MAX_VALUE || length <= 0 ) {
+ if (length > Integer.MAX_VALUE || length <= 0) {
//logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
- return;
+ return;
}
int len = new Long(length).intValue();
//logger.info("Sending block "+blockId+" filelen = "+len);
//logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
ctx.write((new FileHeader(len, blockId)).buffer());
try {
- ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
- .getChannel(), 0, file.length()));
+ ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
+ .getChannel(), 0, file.length()));
} catch (Exception e) {
- // TODO Auto-generated catch block
- //logger.warning("Exception when sending file : "
- //+ file.getAbsolutePath());
+ //logger.warning("Exception when sending file : " + file.getAbsolutePath());
e.printStackTrace();
}
} else {
@@ -58,8 +56,7 @@ public class FileServerHandler extends
}
ctx.flush();
}
-
-
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
diff --git a/core/src/main/java/spark/network/netty/PathResolver.java b/core/src/main/java/spark/network/netty/PathResolver.java
index 5d5eda006e..302411672c 100755
--- a/core/src/main/java/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/spark/network/netty/PathResolver.java
@@ -1,12 +1,12 @@
package spark.network.netty;
+
public interface PathResolver {
/**
* Get the absolute path of the file
- *
+ *
* @param fileId
* @return the absolute path of file
*/
public String getAbsolutePath(String fileId);
-
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 82bcbd5bc2..be33d4260e 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -288,7 +288,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt
val pResolver = new PathResolver {
- def getAbsolutePath(blockId:String):String = {
+ override def getAbsolutePath(blockId: String): String = {
if (!blockId.startsWith("shuffle_")) {
return null
}
@@ -298,7 +298,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
shuffleSender = new Thread {
override def run() = {
val sender = new ShuffleSender(port,pResolver)
- logInfo("created ShuffleSender binding to port : "+ port)
+ logInfo("Created ShuffleSender binding to port : "+ port)
sender.start
}
}