diff options
author | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-05-28 16:27:16 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-05-28 16:27:16 -0700 |
commit | fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2 (patch) | |
tree | 4a24eb7206009d046cdaa5bd59606713d5c179d0 /core | |
parent | 3db1e17baa11fa37b0c7f04d7213a30df66d1611 (diff) | |
download | spark-fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2.tar.gz spark-fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2.tar.bz2 spark-fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2.zip |
Couple of Netty fixes
a. Fix the port number by reading it from the bound channel
b. Fix the shutdown sequence to make sure we actually block on the channel
c. Fix the unit test to use two JVMs.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/spark/network/netty/FileServer.java | 45 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 14 |
2 files changed, 37 insertions, 22 deletions
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index dd3f12561c..dd3a557ae5 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -37,29 +37,33 @@ class FileServer { .childHandler(new FileServerChannelInitializer(pResolver)); // Start the server. channelFuture = bootstrap.bind(addr); - this.port = addr.getPort(); + try { + // Get the address we bound to. + InetSocketAddress boundAddress = + ((InetSocketAddress) channelFuture.sync().channel().localAddress()); + this.port = boundAddress.getPort(); + } catch (InterruptedException ie) { + this.port = 0; + } } /** * Start the file server asynchronously in a new thread. */ public void start() { - try { - blockingThread = new Thread() { - public void run() { - try { - Channel channel = channelFuture.sync().channel(); - channel.closeFuture().sync(); - } catch (InterruptedException e) { - LOG.error("File server start got interrupted", e); - } + blockingThread = new Thread() { + public void run() { + try { + channelFuture.channel().closeFuture().sync(); + LOG.info("FileServer exiting"); + } catch (InterruptedException e) { + LOG.error("File server start got interrupted", e); } - }; - blockingThread.setDaemon(true); - blockingThread.start(); - } finally { - bootstrap.shutdown(); - } + // NOTE: bootstrap is shutdown in stop() + } + }; + blockingThread.setDaemon(true); + blockingThread.start(); } public int getPort() { @@ -67,17 +71,16 @@ class FileServer { } public void stop() { - if (blockingThread != null) { - blockingThread.stop(); - blockingThread = null; - } + // Close the bound channel. if (channelFuture != null) { - channelFuture.channel().closeFuture(); + channelFuture.channel().close(); channelFuture = null; } + // Shutdown bootstrap. if (bootstrap != null) { bootstrap.shutdown(); bootstrap = null; } + // TODO: Shutdown all accepted channels as well ? } } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index fdee7ca384..a4fe14b9ae 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -305,9 +305,20 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(c.partitioner.get === p) } + test("shuffle local cluster") { + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + sc = new SparkContext("local-cluster[2,1,512]", "test") + val a = sc.parallelize(1 to 10, 2) + val b = a.map { + x => (x, x * 2) + } + val c = new ShuffledRDD(b, new HashPartitioner(3)) + assert(c.count === 10) + } + test("shuffle serializer") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[1,2,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test") val a = sc.parallelize(1 to 10, 2) val b = a.map { x => (x, new ShuffleSuite.NonJavaSerializableClass(x * 2)) @@ -317,6 +328,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName) assert(c.count === 10) } + } object ShuffleSuite { |