diff options
author | shane-huang <shengsheng.huang@intel.com> | 2013-04-16 10:00:33 +0800 |
---|---|---|
committer | shane-huang <shengsheng.huang@intel.com> | 2013-04-16 10:01:01 +0800 |
commit | b493f55a4fe43c83061a361eef029edbac50c006 (patch) | |
tree | 3ab993039e6865f0e1f25f0e60c7f4272cc7cf3a | |
parent | df47b40b764e25cbd10ce49d7152e1d33f51a263 (diff) | |
download | spark-b493f55a4fe43c83061a361eef029edbac50c006.tar.gz spark-b493f55a4fe43c83061a361eef029edbac50c006.tar.bz2 spark-b493f55a4fe43c83061a361eef029edbac50c006.zip |
fix a bug in netty Block Fetcher
Signed-off-by: shane-huang <shengsheng.huang@intel.com>
-rw-r--r-- | core/src/main/java/spark/network/netty/FileServer.java | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 69 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/DiskStore.scala | 4 |
3 files changed, 37 insertions, 37 deletions
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index 729e45f0a1..38af305096 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -51,7 +51,6 @@ public class FileServer { } if (bootstrap != null){ bootstrap.shutdown(); - bootstrap = null; } } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index b8b68d4283..5a00180922 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -470,21 +470,6 @@ class BlockManager( } /** - * A request to fetch one or more blocks, complete with their sizes - */ - class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { - val size = blocks.map(_._2).sum - } - - /** - * A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize - * the block (since we want all deserializaton to happen in the calling thread); can also - * represent a fetch failure if size == -1. - */ - class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { - def failed: Boolean = size == -1 - } - /** * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined * fashion as they're received. Expects a size in bytes to be provided for each block fetched, @@ -494,9 +479,9 @@ class BlockManager( : BlockFetcherIterator = { if(System.getProperty("spark.shuffle.use.netty", "false").toBoolean){ - return new NettyBlockFetcherIterator(this, blocksByAddress) + return BlockFetcherIterator("netty",this, blocksByAddress) } else { - return new BlockFetcherIterator(this, blocksByAddress) + return BlockFetcherIterator("", this, blocksByAddress) } } @@ -916,10 +901,29 @@ object BlockManager extends Logging { } } -class BlockFetcherIterator( + +trait BlockFetcherIterator extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker { + def initialize +} + +object BlockFetcherIterator { + + // A request to fetch one or more blocks, complete with their sizes + class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { + val size = blocks.map(_._2).sum + } + + // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize + // the block (since we want all deserializaton to happen in the calling thread); can also + // represent a fetch failure if size == -1. + class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { + def failed: Boolean = size == -1 + } + +class BasicBlockFetcherIterator( private val blockManager: BlockManager, val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] -) extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker { +) extends BlockFetcherIterator { import blockManager._ @@ -936,21 +940,9 @@ class BlockFetcherIterator( val localBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new HashSet[String]() - // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize - // the block (since we want all deserializaton to happen in the calling thread); can also - // represent a fetch failure if size == -1. - class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { - def failed: Boolean = size == -1 - } - // A queue to hold our results. val results = new LinkedBlockingQueue[FetchResult] - // A request to fetch one or more blocks, complete with their sizes - class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { - val size = blocks.map(_._2).sum - } - // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that // the number of bytes in flight is limited to maxBytesInFlight val fetchRequests = new Queue[FetchRequest] @@ -1072,7 +1064,6 @@ class BlockFetcherIterator( } - initialize() //an iterator that will read fetched blocks off the queue as they arrive. var resultsGotten = 0 @@ -1107,7 +1098,7 @@ class BlockFetcherIterator( class NettyBlockFetcherIterator( blockManager: BlockManager, blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] -) extends BlockFetcherIterator(blockManager,blocksByAddress) { +) extends BasicBlockFetcherIterator(blockManager,blocksByAddress) { import blockManager._ @@ -1129,7 +1120,7 @@ class NettyBlockFetcherIterator( } } catch { case x: InterruptedException => logInfo("Copier Interrupted") - case _ => throw new SparkException("Exception Throw in Shuffle Copier") + //case _ => throw new SparkException("Exception Throw in Shuffle Copier") } } } @@ -1232,3 +1223,13 @@ class NettyBlockFetcherIterator( } } + def apply(t: String, + blockManager: BlockManager, + blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]): BlockFetcherIterator = { + val iter = if (t == "netty") { new NettyBlockFetcherIterator(blockManager,blocksByAddress) } + else { new BasicBlockFetcherIterator(blockManager,blocksByAddress) } + iter.initialize + iter + } + +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index d702bb23e0..cc5bf29a32 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -39,7 +39,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) addShutdownHook() if(useNetty){ - startShuffleBlockSender() + startShuffleBlockSender() } override def getSize(blockId: String): Long = { @@ -229,7 +229,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) case e: Exception => { logError("Error running ShuffleBlockSender ", e) if (shuffleSender != null) { - shuffleSender.stop + shuffleSender.stop shuffleSender = null } } |