From 26962c9340ac92b11d43e87200e699471d0b6330 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 24 May 2013 16:39:33 -0700 Subject: Automatically configure Netty port. This makes unit tests using local-cluster pass. Previously they were failing because Netty was trying to bind to the same port for all processes. Pair programmed with @shivaram. --- .../main/java/spark/network/netty/FileServer.java | 68 ++++++++++++++++------ .../scala/spark/network/netty/ShuffleSender.scala | 23 ++++---- .../scala/spark/storage/BlockFetcherIterator.scala | 3 +- .../main/scala/spark/storage/BlockManager.scala | 13 +++-- .../main/scala/spark/storage/BlockManagerId.scala | 32 +++++++--- core/src/main/scala/spark/storage/DiskStore.scala | 52 ++++------------- .../test/scala/spark/MapOutputTrackerSuite.scala | 28 ++++----- core/src/test/scala/spark/ShuffleSuite.scala | 2 +- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 14 ++--- .../scala/spark/storage/BlockManagerSuite.scala | 6 +- 10 files changed, 129 insertions(+), 112 deletions(-) diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index 647b26bf8a..dd3f12561c 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -1,51 +1,83 @@ package spark.network.netty; +import java.net.InetSocketAddress; + import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelOption; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Server that accept the path of a file an echo back its content. */ class FileServer { + private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private ServerBootstrap bootstrap = null; - private Channel channel = null; - private PathResolver pResolver; + private ChannelFuture channelFuture = null; + private int port = 0; + private Thread blockingThread = null; - public FileServer(PathResolver pResolver) { - this.pResolver = pResolver; - } + public FileServer(PathResolver pResolver, int port) { + InetSocketAddress addr = new InetSocketAddress(port); - public void run(int port) { // Configure the server. bootstrap = new ServerBootstrap(); - try { - bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup()) + bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup()) .channel(OioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .option(ChannelOption.SO_RCVBUF, 1500) .childHandler(new FileServerChannelInitializer(pResolver)); - // Start the server. - channel = bootstrap.bind(port).sync().channel(); - channel.closeFuture().sync(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally{ + // Start the server. + channelFuture = bootstrap.bind(addr); + this.port = addr.getPort(); + } + + /** + * Start the file server asynchronously in a new thread. + */ + public void start() { + try { + blockingThread = new Thread() { + public void run() { + try { + Channel channel = channelFuture.sync().channel(); + channel.closeFuture().sync(); + } catch (InterruptedException e) { + LOG.error("File server start got interrupted", e); + } + } + }; + blockingThread.setDaemon(true); + blockingThread.start(); + } finally { bootstrap.shutdown(); } } + public int getPort() { + return port; + } + public void stop() { - if (channel!=null) { - channel.close(); + if (blockingThread != null) { + blockingThread.stop(); + blockingThread = null; + } + if (channelFuture != null) { + channelFuture.channel().closeFuture(); + channelFuture = null; } if (bootstrap != null) { bootstrap.shutdown(); + bootstrap = null; } } } diff --git a/core/src/main/scala/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/spark/network/netty/ShuffleSender.scala index dc87fefc56..d6fa4b1e80 100644 --- a/core/src/main/scala/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/spark/network/netty/ShuffleSender.scala @@ -5,23 +5,22 @@ import java.io.File import spark.Logging -private[spark] class ShuffleSender(val port: Int, val pResolver: PathResolver) extends Logging { - val server = new FileServer(pResolver) +private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { - Runtime.getRuntime().addShutdownHook( - new Thread() { - override def run() { - server.stop() - } - } - ) + val server = new FileServer(pResolver, portIn) + server.start() - def start() { - server.run(port) + def stop() { + server.stop() } + + def port: Int = server.getPort() } +/** + * An application for testing the shuffle sender as a standalone program. + */ private[spark] object ShuffleSender { def main(args: Array[String]) { @@ -50,7 +49,5 @@ private[spark] object ShuffleSender { } } val sender = new ShuffleSender(port, pResovler) - - sender.start() } } diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala index 88eed0d8c8..95308c7282 100644 --- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala @@ -272,8 +272,7 @@ object BlockFetcherIterator { logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.memoryBytesToString(req.size), req.address.host)) - val cmId = new ConnectionManagerId( - req.address.host, System.getProperty("spark.shuffle.sender.port", "6653").toInt) + val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort) val cpier = new ShuffleCopier cpier.getBlocks(cmId, req.blocks, putResult) logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host ) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 40d608628e..d35c43f194 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -94,11 +94,16 @@ private[spark] class BlockManager( private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) + // If we use Netty for shuffle, start a new Netty-based shuffle sender service. + private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean + private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt + private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 + val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext val blockManagerId = BlockManagerId( - executorId, connectionManager.id.host, connectionManager.id.port) + executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) @@ -266,7 +271,6 @@ private[spark] class BlockManager( } } - /** * Get locations of an array of blocks. */ @@ -274,7 +278,7 @@ private[spark] class BlockManager( val startTimeMs = System.currentTimeMillis val locations = master.getLocations(blockIds).toArray logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) - return locations + locations } /** @@ -971,8 +975,7 @@ private[spark] object BlockManager extends Logging { assert (env != null || blockManagerMaster != null) val locationBlockIds: Seq[Seq[BlockManagerId]] = if (env != null) { - val blockManager = env.blockManager - blockManager.getLocationBlockIds(blockIds) + env.blockManager.getLocationBlockIds(blockIds) } else { blockManagerMaster.getLocations(blockIds) } diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala index f4a2181490..1e557d6148 100644 --- a/core/src/main/scala/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/spark/storage/BlockManagerId.scala @@ -7,18 +7,19 @@ import spark.Utils /** * This class represent an unique identifier for a BlockManager. * The first 2 constructors of this class is made private to ensure that - * BlockManagerId objects can be created only using the factory method in - * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects. + * BlockManagerId objects can be created only using the apply method in + * the companion object. This allows de-duplication of ID objects. * Also, constructor parameters are private to ensure that parameters cannot * be modified from outside this class. */ private[spark] class BlockManagerId private ( private var executorId_ : String, private var host_ : String, - private var port_ : Int + private var port_ : Int, + private var nettyPort_ : Int ) extends Externalizable { - private def this() = this(null, null, 0) // For deserialization only + private def this() = this(null, null, 0, 0) // For deserialization only def executorId: String = executorId_ @@ -39,28 +40,32 @@ private[spark] class BlockManagerId private ( def port: Int = port_ + def nettyPort: Int = nettyPort_ + override def writeExternal(out: ObjectOutput) { out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) + out.writeInt(nettyPort_) } override def readExternal(in: ObjectInput) { executorId_ = in.readUTF() host_ = in.readUTF() port_ = in.readInt() + nettyPort_ = in.readInt() } @throws(classOf[IOException]) private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this) - override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, host, port) + override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort) - override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort override def equals(that: Any) = that match { case id: BlockManagerId => - executorId == id.executorId && port == id.port && host == id.host + executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort case _ => false } @@ -69,8 +74,17 @@ private[spark] class BlockManagerId private ( private[spark] object BlockManagerId { - def apply(execId: String, host: String, port: Int) = - getCachedBlockManagerId(new BlockManagerId(execId, host, port)) + /** + * Returns a [[spark.storage.BlockManagerId]] for the given configuraiton. + * + * @param execId ID of the executor. + * @param host Host name of the block manager. + * @param port Port of the block manager. + * @param nettyPort Optional port for the Netty-based shuffle sender. + * @return A new [[spark.storage.BlockManagerId]]. + */ + def apply(execId: String, host: String, port: Int, nettyPort: Int) = + getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort)) def apply(in: ObjectInput) = { val obj = new BlockManagerId() diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 933eeaa216..57d4dafefc 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -82,22 +82,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val MAX_DIR_CREATION_ATTEMPTS: Int = 10 val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt - var shuffleSender : Thread = null - val thisInstance = this + var shuffleSender : ShuffleSender = null // Create one local directory for each path mentioned in spark.local.dir; then, inside this // directory, create multiple subdirectories that we will hash files into, in order to avoid // having really large inodes at the top level. val localDirs = createLocalDirs() val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean - addShutdownHook() - if(useNetty){ - startShuffleBlockSender() - } - def getBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { new DiskBlockObjectWriter(blockId, serializer, bufferSize) @@ -274,8 +267,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) localDirs.foreach { localDir => if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) } - if (useNetty && shuffleSender != null) + if (shuffleSender != null) { shuffleSender.stop + } } catch { case t: Throwable => logError("Exception while deleting local spark dirs", t) } @@ -283,39 +277,17 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) }) } - private def startShuffleBlockSender() { - try { - val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt - - val pResolver = new PathResolver { - override def getAbsolutePath(blockId: String): String = { - if (!blockId.startsWith("shuffle_")) { - return null - } - thisInstance.getFile(blockId).getAbsolutePath() - } - } - shuffleSender = new Thread { - override def run() = { - val sender = new ShuffleSender(port, pResolver) - logInfo("Created ShuffleSender binding to port : "+ port) - sender.start - } - } - shuffleSender.setDaemon(true) - shuffleSender.start - - } catch { - case interrupted: InterruptedException => - logInfo("Runner thread for ShuffleBlockSender interrupted") - - case e: Exception => { - logError("Error running ShuffleBlockSender ", e) - if (shuffleSender != null) { - shuffleSender.stop - shuffleSender = null + private[storage] def startShuffleBlockSender(port: Int): Int = { + val pResolver = new PathResolver { + override def getAbsolutePath(blockId: String): String = { + if (!blockId.startsWith("shuffle_")) { + return null } + DiskStore.this.getFile(blockId).getAbsolutePath() } } + shuffleSender = new ShuffleSender(port, pResolver) + logInfo("Created ShuffleSender binding to port : "+ shuffleSender.port) + shuffleSender.port } } diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index b5cedc0b68..6e585e1c3a 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -8,7 +8,7 @@ import spark.storage.BlockManagerId import spark.util.AkkaUtils class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { - + test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(1L) === 1) @@ -45,13 +45,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize10000 = MapOutputTracker.compressSize(10000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) val size10000 = MapOutputTracker.decompressSize(compressedSize10000) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0), Array(compressedSize10000, compressedSize1000))) val statuses = tracker.getServerStatuses(10, 0) - assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), - (BlockManagerId("b", "hostB", 1000), size10000))) + assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000, 0), size1000), + (BlockManagerId("b", "hostB", 1000, 0), size10000))) tracker.stop() } @@ -64,14 +64,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize10000 = MapOutputTracker.compressSize(10000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) val size10000 = MapOutputTracker.decompressSize(compressedSize10000) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000, compressedSize1000, compressedSize1000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0), Array(compressedSize10000, compressedSize1000, compressedSize1000))) // As if we had two simulatenous fetch failures - tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) - tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) + tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) + tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) // The remaining reduce task might try to grab the output despite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the @@ -88,12 +88,12 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTracker() masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker") - + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0) val slaveTracker = new MapOutputTracker() slaveTracker.trackerActor = slaveSystem.actorFor( "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker") - + masterTracker.registerShuffle(10, 1) masterTracker.incrementGeneration() slaveTracker.updateGeneration(masterTracker.getGeneration) @@ -102,13 +102,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize1000 = MapOutputTracker.compressSize(1000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) + BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000))) masterTracker.incrementGeneration() slaveTracker.updateGeneration(masterTracker.getGeneration) assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000), size1000))) + Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) - masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) + masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) masterTracker.incrementGeneration() slaveTracker.updateGeneration(masterTracker.getGeneration) intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index fdee7ca384..58c834c735 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -326,5 +326,5 @@ object ShuffleSuite { x + y } - class NonJavaSerializableClass(val value: Int) + class NonJavaSerializableClass(val value: Int) extends Serializable } diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 16554eac6e..30e6fef950 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -44,7 +44,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager taskSet.tasks.foreach(_.generation = mapOutputTracker.getGeneration) - taskSets += taskSet + taskSets += taskSet } override def setListener(listener: TaskSchedulerListener) = {} override def defaultParallelism() = 2 @@ -164,7 +164,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } } } - + /** Sends the rdd to the scheduler for scheduling. */ private def submit( rdd: RDD[_], @@ -174,7 +174,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont listener: JobListener = listener) { runEvent(JobSubmitted(rdd, func, partitions, allowLocal, null, listener)) } - + /** Sends TaskSetFailed to the scheduler. */ private def failed(taskSet: TaskSet, message: String) { runEvent(TaskSetFailed(taskSet, message)) @@ -209,11 +209,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener)) assert(results === Map(0 -> 42)) } - + test("run trivial job w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - submit(finalRdd, Array(0)) + submit(finalRdd, Array(0)) complete(taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) } @@ -250,7 +250,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) } - + test("run trivial shuffle with fetch failure") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) @@ -398,6 +398,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2)) private def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345) + BlockManagerId("exec-" + host, host, 12345, 0) } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 71d1f0bcc8..bff2475686 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -99,9 +99,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("BlockManagerId object caching") { - val id1 = BlockManagerId("e1", "XXX", 1) - val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1 - val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object + val id1 = BlockManagerId("e1", "XXX", 1, 0) + val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1 + val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object assert(id2 === id1, "id2 is not same as id1") assert(id2.eq(id1), "id2 is not the same object as id1") assert(id3 != id1, "id3 is same as id1") -- cgit v1.2.3