diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-11-06 18:39:14 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-11-06 18:39:14 -0800 |
commit | f165b2bbf5d4acf34d826fa55b900f5bbc295654 (patch) | |
tree | 083430602410d4d9c1884b2092aa599a24cc35a4 /core | |
parent | 6e9ef10fd7446a11f37446c961916ba2a8e02cb8 (diff) | |
download | spark-f165b2bbf5d4acf34d826fa55b900f5bbc295654.tar.gz spark-f165b2bbf5d4acf34d826fa55b900f5bbc295654.tar.bz2 spark-f165b2bbf5d4acf34d826fa55b900f5bbc295654.zip |
[SPARK-4188] [Core] Perform network-level retry of shuffle file fetches
This adds a RetryingBlockFetcher to the NettyBlockTransferService which is wrapped around our typical OneForOneBlockFetcher, adding retry logic in the event of an IOException.
This sort of retry allows us to avoid marking an entire executor as failed due to garbage collection or high network load.
TODO:
- [x] unit tests
- [x] put in ExternalShuffleClient too
Author: Aaron Davidson <aaron@databricks.com>
Closes #3101 from aarondav/retry and squashes the following commits:
72a2a32 [Aaron Davidson] Add that we should remove the condition around the retry thingy
c7fd107 [Aaron Davidson] Fix unit tests
e80e4c2 [Aaron Davidson] Address initial comments
6f594cd [Aaron Davidson] Fix unit test
05ff43c [Aaron Davidson] Add to external shuffle client and add unit test
66e5a24 [Aaron Davidson] [SPARK-4238] [Core] Perform network-level retry of shuffle file fetches
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala | 21 |
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 0d1fc81d2a..b937ea825f 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -27,7 +27,7 @@ import org.apache.spark.network.client.{TransportClientBootstrap, RpcResponseCal import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock} import org.apache.spark.network.sasl.{SaslRpcHandler, SaslClientBootstrap} import org.apache.spark.network.server._ -import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher} +import org.apache.spark.network.shuffle.{RetryingBlockFetcher, BlockFetchingListener, OneForOneBlockFetcher} import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.Utils @@ -71,9 +71,22 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage listener: BlockFetchingListener): Unit = { logTrace(s"Fetch blocks from $host:$port (executor id $execId)") try { - val client = clientFactory.createClient(host, port) - new OneForOneBlockFetcher(client, blockIds.toArray, listener) - .start(OpenBlocks(blockIds.map(BlockId.apply))) + val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { + override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { + val client = clientFactory.createClient(host, port) + new OneForOneBlockFetcher(client, blockIds.toArray, listener) + .start(OpenBlocks(blockIds.map(BlockId.apply))) + } + } + + val maxRetries = transportConf.maxIORetries() + if (maxRetries > 0) { + // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's + // a bug in this code. We should remove the if statement once we're sure of the stability. + new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() + } else { + blockFetchStarter.createAndStart(blockIds, listener) + } } catch { case e: Exception => logError("Exception while beginning fetchBlocks", e) |