From b493f55a4fe43c83061a361eef029edbac50c006 Mon Sep 17 00:00:00 2001 From: shane-huang Date: Tue, 16 Apr 2013 10:00:33 +0800 Subject: fix a bug in netty Block Fetcher Signed-off-by: shane-huang --- .../main/java/spark/network/netty/FileServer.java | 1 - .../main/scala/spark/storage/BlockManager.scala | 69 +++++++++++----------- core/src/main/scala/spark/storage/DiskStore.scala | 4 +- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index 729e45f0a1..38af305096 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -51,7 +51,6 @@ public class FileServer { } if (bootstrap != null){ bootstrap.shutdown(); - bootstrap = null; } } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index b8b68d4283..5a00180922 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -469,21 +469,6 @@ class BlockManager( getLocal(blockId).orElse(getRemote(blockId)) } - /** - * A request to fetch one or more blocks, complete with their sizes - */ - class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { - val size = blocks.map(_._2).sum - } - - /** - * A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize - * the block (since we want all deserializaton to happen in the calling thread); can also - * represent a fetch failure if size == -1. - */ - class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { - def failed: Boolean = size == -1 - } /** * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined @@ -494,9 +479,9 @@ class BlockManager( : BlockFetcherIterator = { if(System.getProperty("spark.shuffle.use.netty", "false").toBoolean){ - return new NettyBlockFetcherIterator(this, blocksByAddress) + return BlockFetcherIterator("netty",this, blocksByAddress) } else { - return new BlockFetcherIterator(this, blocksByAddress) + return BlockFetcherIterator("", this, blocksByAddress) } } @@ -916,10 +901,29 @@ object BlockManager extends Logging { } } -class BlockFetcherIterator( + +trait BlockFetcherIterator extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker { + def initialize +} + +object BlockFetcherIterator { + + // A request to fetch one or more blocks, complete with their sizes + class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { + val size = blocks.map(_._2).sum + } + + // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize + // the block (since we want all deserializaton to happen in the calling thread); can also + // represent a fetch failure if size == -1. + class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { + def failed: Boolean = size == -1 + } + +class BasicBlockFetcherIterator( private val blockManager: BlockManager, val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] -) extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker { +) extends BlockFetcherIterator { import blockManager._ @@ -936,21 +940,9 @@ class BlockFetcherIterator( val localBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new HashSet[String]() - // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize - // the block (since we want all deserializaton to happen in the calling thread); can also - // represent a fetch failure if size == -1. - class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { - def failed: Boolean = size == -1 - } - // A queue to hold our results. val results = new LinkedBlockingQueue[FetchResult] - // A request to fetch one or more blocks, complete with their sizes - class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { - val size = blocks.map(_._2).sum - } - // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that // the number of bytes in flight is limited to maxBytesInFlight val fetchRequests = new Queue[FetchRequest] @@ -1072,7 +1064,6 @@ class BlockFetcherIterator( } - initialize() //an iterator that will read fetched blocks off the queue as they arrive. var resultsGotten = 0 @@ -1107,7 +1098,7 @@ class BlockFetcherIterator( class NettyBlockFetcherIterator( blockManager: BlockManager, blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] -) extends BlockFetcherIterator(blockManager,blocksByAddress) { +) extends BasicBlockFetcherIterator(blockManager,blocksByAddress) { import blockManager._ @@ -1129,7 +1120,7 @@ class NettyBlockFetcherIterator( } } catch { case x: InterruptedException => logInfo("Copier Interrupted") - case _ => throw new SparkException("Exception Throw in Shuffle Copier") + //case _ => throw new SparkException("Exception Throw in Shuffle Copier") } } } @@ -1232,3 +1223,13 @@ class NettyBlockFetcherIterator( } } + def apply(t: String, + blockManager: BlockManager, + blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]): BlockFetcherIterator = { + val iter = if (t == "netty") { new NettyBlockFetcherIterator(blockManager,blocksByAddress) } + else { new BasicBlockFetcherIterator(blockManager,blocksByAddress) } + iter.initialize + iter + } + +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index d702bb23e0..cc5bf29a32 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -39,7 +39,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) addShutdownHook() if(useNetty){ - startShuffleBlockSender() + startShuffleBlockSender() } override def getSize(blockId: String): Long = { @@ -229,7 +229,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) case e: Exception => { logError("Error running ShuffleBlockSender ", e) if (shuffleSender != null) { - shuffleSender.stop + shuffleSender.stop shuffleSender = null } } -- cgit v1.2.3