diff options
Diffstat (limited to 'core/src/main/scala/spark/network/netty/ShuffleSender.scala')
-rw-r--r-- | core/src/main/scala/spark/network/netty/ShuffleSender.scala | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/spark/network/netty/ShuffleSender.scala new file mode 100644 index 0000000000..d6fa4b1e80 --- /dev/null +++ b/core/src/main/scala/spark/network/netty/ShuffleSender.scala @@ -0,0 +1,53 @@ +package spark.network.netty + +import java.io.File + +import spark.Logging + + +private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { + + val server = new FileServer(pResolver, portIn) + server.start() + + def stop() { + server.stop() + } + + def port: Int = server.getPort() +} + + +/** + * An application for testing the shuffle sender as a standalone program. + */ +private[spark] object ShuffleSender { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: ShuffleSender <port> <subDirsPerLocalDir> <list of shuffle_block_directories>") + System.exit(1) + } + + val port = args(0).toInt + val subDirsPerLocalDir = args(1).toInt + val localDirs = args.drop(2).map(new File(_)) + + val pResovler = new PathResolver { + override def getAbsolutePath(blockId: String): String = { + if (!blockId.startsWith("shuffle_")) { + throw new Exception("Block " + blockId + " is not a shuffle block") + } + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = math.abs(blockId.hashCode) + val dirId = hash % localDirs.length + val subDirId = (hash / localDirs.length) % subDirsPerLocalDir + val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) + val file = new File(subDir, blockId) + return file.getAbsolutePath + } + } + val sender = new ShuffleSender(port, pResovler) + } +} |