From fbc1ab346867d5c81dc59e4c8d85aeda2f516ce2 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Tue, 28 May 2013 16:27:16 -0700 Subject: Couple of Netty fixes a. Fix the port number by reading it from the bound channel b. Fix the shutdown sequence to make sure we actually block on the channel c. Fix the unit test to use two JVMs. --- .../main/java/spark/network/netty/FileServer.java | 45 ++++++++++++---------- core/src/test/scala/spark/ShuffleSuite.scala | 14 ++++++- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index dd3f12561c..dd3a557ae5 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -37,29 +37,33 @@ class FileServer { .childHandler(new FileServerChannelInitializer(pResolver)); // Start the server. channelFuture = bootstrap.bind(addr); - this.port = addr.getPort(); + try { + // Get the address we bound to. + InetSocketAddress boundAddress = + ((InetSocketAddress) channelFuture.sync().channel().localAddress()); + this.port = boundAddress.getPort(); + } catch (InterruptedException ie) { + this.port = 0; + } } /** * Start the file server asynchronously in a new thread. */ public void start() { - try { - blockingThread = new Thread() { - public void run() { - try { - Channel channel = channelFuture.sync().channel(); - channel.closeFuture().sync(); - } catch (InterruptedException e) { - LOG.error("File server start got interrupted", e); - } + blockingThread = new Thread() { + public void run() { + try { + channelFuture.channel().closeFuture().sync(); + LOG.info("FileServer exiting"); + } catch (InterruptedException e) { + LOG.error("File server start got interrupted", e); } - }; - blockingThread.setDaemon(true); - blockingThread.start(); - } finally { - bootstrap.shutdown(); - } + // NOTE: bootstrap is shutdown in stop() + } + }; + blockingThread.setDaemon(true); + blockingThread.start(); } public int getPort() { @@ -67,17 +71,16 @@ class FileServer { } public void stop() { - if (blockingThread != null) { - blockingThread.stop(); - blockingThread = null; - } + // Close the bound channel. if (channelFuture != null) { - channelFuture.channel().closeFuture(); + channelFuture.channel().close(); channelFuture = null; } + // Shutdown bootstrap. if (bootstrap != null) { bootstrap.shutdown(); bootstrap = null; } + // TODO: Shutdown all accepted channels as well ? } } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index fdee7ca384..a4fe14b9ae 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -305,9 +305,20 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(c.partitioner.get === p) } + test("shuffle local cluster") { + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + sc = new SparkContext("local-cluster[2,1,512]", "test") + val a = sc.parallelize(1 to 10, 2) + val b = a.map { + x => (x, x * 2) + } + val c = new ShuffledRDD(b, new HashPartitioner(3)) + assert(c.count === 10) + } + test("shuffle serializer") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[1,2,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test") val a = sc.parallelize(1 to 10, 2) val b = a.map { x => (x, new ShuffleSuite.NonJavaSerializableClass(x * 2)) @@ -317,6 +328,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName) assert(c.count === 10) } + } object ShuffleSuite { -- cgit v1.2.3 From b79b10a6d60a7f1f199e6bddd1243a05c57526ad Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 29 May 2013 00:52:55 -0700 Subject: Flush serializer to fix zero-size kryo blocks bug. Also convert the local-cluster test case to check for non-zero block sizes --- core/src/main/scala/spark/storage/DiskStore.scala | 2 ++ core/src/test/scala/spark/ShuffleSuite.scala | 22 +++++++++++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 57d4dafefc..1829c2f92e 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -59,6 +59,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) // Flush the partial writes, and set valid length to be the length of the entire file. // Return the number of bytes written for this commit. override def commit(): Long = { + // NOTE: Flush the serializer first and then the compressed/buffered output stream + objOut.flush() bs.flush() val prevPos = lastValidPosition lastValidPosition = channel.position() diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index a4fe14b9ae..271f4a4e44 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -305,15 +305,27 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(c.partitioner.get === p) } - test("shuffle local cluster") { - // Use a local cluster with 2 processes to make sure there are both local and remote blocks + test("shuffle non-zero block size") { sc = new SparkContext("local-cluster[2,1,512]", "test") + val NUM_BLOCKS = 3 + val a = sc.parallelize(1 to 10, 2) - val b = a.map { - x => (x, x * 2) + val b = a.map { x => + (x, new ShuffleSuite.NonJavaSerializableClass(x * 2)) } - val c = new ShuffledRDD(b, new HashPartitioner(3)) + // If the Kryo serializer is not used correctly, the shuffle would fail because the + // default Java serializer cannot handle the non serializable class. + val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS), + classOf[spark.KryoSerializer].getName) + val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId + assert(c.count === 10) + + // All blocks must have non-zero size + (0 until NUM_BLOCKS).foreach { id => + val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id) + assert(statuses.forall(s => s._2 > 0)) + } } test("shuffle serializer") { -- cgit v1.2.3 From 19fd6d54c012bd9f73620e9b817f4975de162277 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 29 May 2013 17:29:34 -0700 Subject: Also flush serializer in revertPartialWrites --- core/src/main/scala/spark/storage/DiskStore.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 1829c2f92e..c7281200e7 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -70,6 +70,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def revertPartialWrites() { // Discard current writes. We do this by flushing the outstanding writes and // truncate the file to the last valid position. + objOut.flush() bs.flush() channel.truncate(lastValidPosition) } -- cgit v1.2.3