aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshane-huang <shengsheng.huang@intel.com>2013-04-16 10:00:33 +0800
committershane-huang <shengsheng.huang@intel.com>2013-04-16 10:01:01 +0800
commitb493f55a4fe43c83061a361eef029edbac50c006 (patch)
tree3ab993039e6865f0e1f25f0e60c7f4272cc7cf3a
parentdf47b40b764e25cbd10ce49d7152e1d33f51a263 (diff)
downloadspark-b493f55a4fe43c83061a361eef029edbac50c006.tar.gz
spark-b493f55a4fe43c83061a361eef029edbac50c006.tar.bz2
spark-b493f55a4fe43c83061a361eef029edbac50c006.zip
fix a bug in netty Block Fetcher
Signed-off-by: shane-huang <shengsheng.huang@intel.com>
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java1
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala69
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala4
3 files changed, 37 insertions, 37 deletions
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index 729e45f0a1..38af305096 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -51,7 +51,6 @@ public class FileServer {
}
if (bootstrap != null){
bootstrap.shutdown();
- bootstrap = null;
}
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index b8b68d4283..5a00180922 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -470,21 +470,6 @@ class BlockManager(
}
/**
- * A request to fetch one or more blocks, complete with their sizes
- */
- class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) {
- val size = blocks.map(_._2).sum
- }
-
- /**
- * A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
- * the block (since we want all deserializaton to happen in the calling thread); can also
- * represent a fetch failure if size == -1.
- */
- class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) {
- def failed: Boolean = size == -1
- }
- /**
* Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns
* an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined
* fashion as they're received. Expects a size in bytes to be provided for each block fetched,
@@ -494,9 +479,9 @@ class BlockManager(
: BlockFetcherIterator = {
if(System.getProperty("spark.shuffle.use.netty", "false").toBoolean){
- return new NettyBlockFetcherIterator(this, blocksByAddress)
+ return BlockFetcherIterator("netty",this, blocksByAddress)
} else {
- return new BlockFetcherIterator(this, blocksByAddress)
+ return BlockFetcherIterator("", this, blocksByAddress)
}
}
@@ -916,10 +901,29 @@ object BlockManager extends Logging {
}
}
-class BlockFetcherIterator(
+
+trait BlockFetcherIterator extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker {
+ def initialize
+}
+
+object BlockFetcherIterator {
+
+ // A request to fetch one or more blocks, complete with their sizes
+ class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) {
+ val size = blocks.map(_._2).sum
+ }
+
+ // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
+ // the block (since we want all deserializaton to happen in the calling thread); can also
+ // represent a fetch failure if size == -1.
+ class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) {
+ def failed: Boolean = size == -1
+ }
+
+class BasicBlockFetcherIterator(
private val blockManager: BlockManager,
val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]
-) extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker {
+) extends BlockFetcherIterator {
import blockManager._
@@ -936,21 +940,9 @@ class BlockFetcherIterator(
val localBlockIds = new ArrayBuffer[String]()
val remoteBlockIds = new HashSet[String]()
- // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
- // the block (since we want all deserializaton to happen in the calling thread); can also
- // represent a fetch failure if size == -1.
- class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) {
- def failed: Boolean = size == -1
- }
-
// A queue to hold our results.
val results = new LinkedBlockingQueue[FetchResult]
- // A request to fetch one or more blocks, complete with their sizes
- class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) {
- val size = blocks.map(_._2).sum
- }
-
// Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
// the number of bytes in flight is limited to maxBytesInFlight
val fetchRequests = new Queue[FetchRequest]
@@ -1072,7 +1064,6 @@ class BlockFetcherIterator(
}
- initialize()
//an iterator that will read fetched blocks off the queue as they arrive.
var resultsGotten = 0
@@ -1107,7 +1098,7 @@ class BlockFetcherIterator(
class NettyBlockFetcherIterator(
blockManager: BlockManager,
blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]
-) extends BlockFetcherIterator(blockManager,blocksByAddress) {
+) extends BasicBlockFetcherIterator(blockManager,blocksByAddress) {
import blockManager._
@@ -1129,7 +1120,7 @@ class NettyBlockFetcherIterator(
}
} catch {
case x: InterruptedException => logInfo("Copier Interrupted")
- case _ => throw new SparkException("Exception Throw in Shuffle Copier")
+ //case _ => throw new SparkException("Exception Throw in Shuffle Copier")
}
}
}
@@ -1232,3 +1223,13 @@ class NettyBlockFetcherIterator(
}
}
+ def apply(t: String,
+ blockManager: BlockManager,
+ blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]): BlockFetcherIterator = {
+ val iter = if (t == "netty") { new NettyBlockFetcherIterator(blockManager,blocksByAddress) }
+ else { new BasicBlockFetcherIterator(blockManager,blocksByAddress) }
+ iter.initialize
+ iter
+ }
+
+}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index d702bb23e0..cc5bf29a32 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -39,7 +39,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook()
if(useNetty){
- startShuffleBlockSender()
+ startShuffleBlockSender()
}
override def getSize(blockId: String): Long = {
@@ -229,7 +229,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
case e: Exception => {
logError("Error running ShuffleBlockSender ", e)
if (shuffleSender != null) {
- shuffleSender.stop
+ shuffleSender.stop
shuffleSender = null
}
}