aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-05-28 16:27:16 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-05-28 16:27:16 -0700
commitfbc1ab346867d5c81dc59e4c8d85aeda2f516ce2 (patch)
tree4a24eb7206009d046cdaa5bd59606713d5c179d0 /core/src/main/java
parent3db1e17baa11fa37b0c7f04d7213a30df66d1611 (diff)
downloadspark-fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2.tar.gz
spark-fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2.tar.bz2
spark-fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2.zip
Couple of Netty fixes
a. Fix the port number by reading it from the bound channel b. Fix the shutdown sequence to make sure we actually block on the channel c. Fix the unit test to use two JVMs.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java45
1 files changed, 24 insertions, 21 deletions
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index dd3f12561c..dd3a557ae5 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -37,29 +37,33 @@ class FileServer {
.childHandler(new FileServerChannelInitializer(pResolver));
// Start the server.
channelFuture = bootstrap.bind(addr);
- this.port = addr.getPort();
+ try {
+ // Get the address we bound to.
+ InetSocketAddress boundAddress =
+ ((InetSocketAddress) channelFuture.sync().channel().localAddress());
+ this.port = boundAddress.getPort();
+ } catch (InterruptedException ie) {
+ this.port = 0;
+ }
}
/**
* Start the file server asynchronously in a new thread.
*/
public void start() {
- try {
- blockingThread = new Thread() {
- public void run() {
- try {
- Channel channel = channelFuture.sync().channel();
- channel.closeFuture().sync();
- } catch (InterruptedException e) {
- LOG.error("File server start got interrupted", e);
- }
+ blockingThread = new Thread() {
+ public void run() {
+ try {
+ channelFuture.channel().closeFuture().sync();
+ LOG.info("FileServer exiting");
+ } catch (InterruptedException e) {
+ LOG.error("File server start got interrupted", e);
}
- };
- blockingThread.setDaemon(true);
- blockingThread.start();
- } finally {
- bootstrap.shutdown();
- }
+ // NOTE: bootstrap is shutdown in stop()
+ }
+ };
+ blockingThread.setDaemon(true);
+ blockingThread.start();
}
public int getPort() {
@@ -67,17 +71,16 @@ class FileServer {
}
public void stop() {
- if (blockingThread != null) {
- blockingThread.stop();
- blockingThread = null;
- }
+ // Close the bound channel.
if (channelFuture != null) {
- channelFuture.channel().closeFuture();
+ channelFuture.channel().close();
channelFuture = null;
}
+ // Shutdown bootstrap.
if (bootstrap != null) {
bootstrap.shutdown();
bootstrap = null;
}
+ // TODO: Shutdown all accepted channels as well ?
}
}