aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-24 16:39:33 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-24 16:39:33 -0700
commit26962c9340ac92b11d43e87200e699471d0b6330 (patch)
tree0122f5141df5f79036dd8e2b76cc91b47dc4b822 /core/src/main/java
parent6ea085169d8ba2d09ca9236273d65238b8411f04 (diff)
downloadspark-26962c9340ac92b11d43e87200e699471d0b6330.tar.gz
spark-26962c9340ac92b11d43e87200e699471d0b6330.tar.bz2
spark-26962c9340ac92b11d43e87200e699471d0b6330.zip
Automatically configure Netty port. This makes unit tests using
local-cluster pass. Previously they were failing because Netty was trying to bind to the same port for all processes. Pair programmed with @shivaram.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java68
1 files changed, 50 insertions, 18 deletions
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index 647b26bf8a..dd3f12561c 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -1,51 +1,83 @@
package spark.network.netty;
+import java.net.InetSocketAddress;
+
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelOption;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Server that accept the path of a file an echo back its content.
*/
class FileServer {
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
private ServerBootstrap bootstrap = null;
- private Channel channel = null;
- private PathResolver pResolver;
+ private ChannelFuture channelFuture = null;
+ private int port = 0;
+ private Thread blockingThread = null;
- public FileServer(PathResolver pResolver) {
- this.pResolver = pResolver;
- }
+ public FileServer(PathResolver pResolver, int port) {
+ InetSocketAddress addr = new InetSocketAddress(port);
- public void run(int port) {
// Configure the server.
bootstrap = new ServerBootstrap();
- try {
- bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+ bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_RCVBUF, 1500)
.childHandler(new FileServerChannelInitializer(pResolver));
- // Start the server.
- channel = bootstrap.bind(port).sync().channel();
- channel.closeFuture().sync();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally{
+ // Start the server.
+ channelFuture = bootstrap.bind(addr);
+ this.port = addr.getPort();
+ }
+
+ /**
+ * Start the file server asynchronously in a new thread.
+ */
+ public void start() {
+ try {
+ blockingThread = new Thread() {
+ public void run() {
+ try {
+ Channel channel = channelFuture.sync().channel();
+ channel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ LOG.error("File server start got interrupted", e);
+ }
+ }
+ };
+ blockingThread.setDaemon(true);
+ blockingThread.start();
+ } finally {
bootstrap.shutdown();
}
}
+ public int getPort() {
+ return port;
+ }
+
public void stop() {
- if (channel!=null) {
- channel.close();
+ if (blockingThread != null) {
+ blockingThread.stop();
+ blockingThread = null;
+ }
+ if (channelFuture != null) {
+ channelFuture.channel().closeFuture();
+ channelFuture = null;
}
if (bootstrap != null) {
bootstrap.shutdown();
+ bootstrap = null;
}
}
}