diff options
author | Nezih Yigitbasi <nyigitbasi@netflix.com> | 2016-03-11 11:11:53 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-03-11 11:11:53 -0800 |
commit | ff776b2fc1cd4c571fd542dbf807e6fa3373cb34 (patch) | |
tree | 64e040c08ff39e3914130dc2fcb163fbf368f508 | |
parent | eb650a81f14fa7bc665856397e19ddf1a92ca3c5 (diff) | |
download | spark-ff776b2fc1cd4c571fd542dbf807e6fa3373cb34.tar.gz spark-ff776b2fc1cd4c571fd542dbf807e6fa3373cb34.tar.bz2 spark-ff776b2fc1cd4c571fd542dbf807e6fa3373cb34.zip |
[SPARK-13328][CORE] Poor read performance for broadcast variables with dynamic resource allocation
When dynamic resource allocation is enabled fetching broadcast variables from removed executors were causing job failures and SPARK-9591 fixed this problem by trying all locations of a block before giving up. However, the locations of a block is retrieved only once from the driver in this process and the locations in this list can be stale due to dynamic resource allocation. This situation gets worse when running on a large cluster as the size of this location list can be in the order of several hundreds out of which there may be tens of stale entries. What we have observed is with the default settings of 3 max retries and 5s between retries (that's 15s per location) the time it takes to read a broadcast variable can be as high as ~17m (70 failed attempts * 15s/attempt)
Author: Nezih Yigitbasi <nyigitbasi@netflix.com>
Closes #11241 from nezihyigitbasi/SPARK-13328.
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 47 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 84 |
2 files changed, 116 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 873330e136..bcf65e9d7e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -133,6 +133,9 @@ private[spark] class BlockManager( private val compressRdds = conf.getBoolean("spark.rdd.compress", false) // Whether to compress shuffle output temporarily spilled to disk private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) + // Max number of failures before this block manager refreshes the block locations from the driver + private val maxFailuresBeforeLocationRefresh = + conf.getInt("spark.block.failures.beforeLocationRefresh", 5) private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, @@ -568,26 +571,46 @@ private[spark] class BlockManager( def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") + var runningFailureCount = 0 + var totalFailureCount = 0 val locations = getLocations(blockId) - var numFetchFailures = 0 - for (loc <- locations) { + val maxFetchFailures = locations.size + var locationIterator = locations.iterator + while (locationIterator.hasNext) { + val loc = locationIterator.next() logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { case NonFatal(e) => - numFetchFailures += 1 - if (numFetchFailures == locations.size) { - // An exception is thrown while fetching this block from all locations - throw new BlockFetchException(s"Failed to fetch block from" + - s" ${locations.size} locations. Most recent failure cause:", e) - } else { - // This location failed, so we retry fetch from a different one by returning null here - logWarning(s"Failed to fetch remote block $blockId " + - s"from $loc (failed attempt $numFetchFailures)", e) - null + runningFailureCount += 1 + totalFailureCount += 1 + + if (totalFailureCount >= maxFetchFailures) { + // Give up trying anymore locations. Either we've tried all of the original locations, + // or we've refreshed the list of locations from the master, and have still + // hit failures after trying locations from the refreshed list. + throw new BlockFetchException(s"Failed to fetch block after" + + s" ${totalFailureCount} fetch failures. Most recent failure cause:", e) + } + + logWarning(s"Failed to fetch remote block $blockId " + + s"from $loc (failed attempt $runningFailureCount)", e) + + // If there is a large number of executors then locations list can contain a + // large number of stale entries causing a large number of retries that may + // take a significant amount of time. To get rid of these stale entries + // we refresh the block locations after a certain number of fetch failures + if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { + locationIterator = getLocations(blockId).iterator + logDebug(s"Refreshed locations from the driver " + + s"after ${runningFailureCount} fetch failures.") + runningFailureCount = 0 } + + // This location failed, so we retry fetch from a different one by returning null here + null } if (data != null) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 42595c8cf2..dc4be14677 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -21,11 +21,12 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ +import scala.concurrent.Future import scala.language.implicitConversions import scala.language.postfixOps import org.mockito.{Matchers => mc} -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ @@ -33,7 +34,10 @@ import org.scalatest.concurrent.Timeouts._ import org.apache.spark._ import org.apache.spark.executor.DataReadMethod import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.network.{BlockDataManager, BlockTransferService} +import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.NettyBlockTransferService +import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} @@ -66,9 +70,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER, - master: BlockManagerMaster = this.master): BlockManager = { + master: BlockManagerMaster = this.master, + transferService: Option[BlockTransferService] = Option.empty): BlockManager = { val serializer = new KryoSerializer(conf) - val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) + val transfer = transferService + .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) @@ -1287,6 +1293,78 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getSingle("a1").isDefined, "a1 was not in store") assert(store.getSingle("a3").isDefined, "a3 was not in store") } + + test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") { + val mockBlockTransferService = + new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5)) + store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) + store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true) + intercept[BlockFetchException] { + store.getRemoteBytes("item") + } + } + + test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") { + val maxFailuresBeforeLocationRefresh = + conf.getInt("spark.block.failures.beforeLocationRefresh", 5) + val mockBlockManagerMaster = mock(classOf[BlockManagerMaster]) + val mockBlockTransferService = + new MockBlockTransferService(maxFailuresBeforeLocationRefresh) + // make sure we have more than maxFailuresBeforeLocationRefresh locations + // so that we have a chance to do location refresh + val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh) + .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) } + when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockManagerIds) + store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, + transferService = Option(mockBlockTransferService)) + val block = store.getRemoteBytes("item") + .asInstanceOf[Option[ByteBuffer]] + assert(block.isDefined) + verify(mockBlockManagerMaster, times(2)).getLocations("item") + } + + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { + var numCalls = 0 + + override def init(blockDataManager: BlockDataManager): Unit = {} + + override def fetchBlocks( + host: String, + port: Int, + execId: String, + blockIds: Array[String], + listener: BlockFetchingListener): Unit = { + listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) + } + + override def close(): Unit = {} + + override def hostName: String = { "MockBlockTransferServiceHost" } + + override def port: Int = { 63332 } + + override def uploadBlock( + hostname: String, + port: Int, execId: String, + blockId: BlockId, + blockData: ManagedBuffer, + level: StorageLevel): Future[Unit] = { + import scala.concurrent.ExecutionContext.Implicits.global + Future {} + } + + override def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String): ManagedBuffer = { + numCalls += 1 + if (numCalls <= maxFailures) { + throw new RuntimeException("Failing block fetch in the mock block transfer service") + } + super.fetchBlockSync(host, port, execId, blockId) + } + } } private object BlockManagerSuite { |