aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala3
-rw-r--r--docs/configuration.md9
5 files changed, 68 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9939103bb0..49329423dc 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -101,6 +101,9 @@ private[spark] class ExecutorAllocationManager(
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "60s")
+ private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${2 * executorIdleTimeoutS}s")
+
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -459,9 +462,23 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
+ // Note that it is not necessary to query the executors since all the cached
+ // blocks we are concerned with are reported to the driver. Note that this
+ // does not include broadcast blocks.
+ val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
+ val now = clock.getTimeMillis()
+ val timeout = {
+ if (hasCachedBlocks) {
+ // Use a different timeout if the executor has cached blocks.
+ now + cachedExecutorIdleTimeoutS * 1000
+ } else {
+ now + executorIdleTimeoutS * 1000
+ }
+ }
+ val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
+ removeTimes(executorId) = realTimeout
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
- s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
- removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
+ s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index abcad9438b..7cdae22b0e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -202,6 +202,14 @@ class BlockManagerMaster(
Await.result(future, timeout)
}
+ /**
+ * Find out if the executor has cached blocks. This method does not consider broadcast blocks,
+ * since they are not reported the master.
+ */
+ def hasCachedBlocks(executorId: String): Boolean = {
+ driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
+ }
+
/** Stop the driver endpoint, called only on the Spark driver node */
def stop() {
if (driverEndpoint != null && isDriver) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 2cd8c5297b..68ed909673 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import java.util.{HashMap => JHashMap}
+import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
@@ -112,6 +113,17 @@ class BlockManagerMasterEndpoint(
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
+ case HasCachedBlocks(executorId) =>
+ blockManagerIdByExecutor.get(executorId) match {
+ case Some(bm) =>
+ if (blockManagerInfo.contains(bm)) {
+ val bmInfo = blockManagerInfo(bm)
+ context.reply(bmInfo.cachedBlocks.nonEmpty)
+ } else {
+ context.reply(false)
+ }
+ case None => context.reply(false)
+ }
}
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
@@ -418,6 +430,9 @@ private[spark] class BlockManagerInfo(
// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]
+ // Cached blocks held by this BlockManager. This does not include broadcast blocks.
+ private val _cachedBlocks = new mutable.HashSet[BlockId]
+
def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
def updateLastSeenMs() {
@@ -451,27 +466,35 @@ private[spark] class BlockManagerInfo(
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
+ var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
- _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0))
+ blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
+ _blocks.put(blockId, blockStatus)
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
- _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0))
+ blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
+ _blocks.put(blockId, blockStatus)
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
- _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
+ blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
+ _blocks.put(blockId, blockStatus)
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
+ if (!blockId.isBroadcast && blockStatus.isCached) {
+ _cachedBlocks += blockId
+ }
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
+ _cachedBlocks -= blockId
if (blockStatus.storageLevel.useMemory) {
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
@@ -494,6 +517,7 @@ private[spark] class BlockManagerInfo(
_remainingMem += _blocks.get(blockId).memSize
_blocks.remove(blockId)
}
+ _cachedBlocks -= blockId
}
def remainingMem: Long = _remainingMem
@@ -502,6 +526,9 @@ private[spark] class BlockManagerInfo(
def blocks: JHashMap[BlockId, BlockStatus] = _blocks
+ // This does not include broadcast blocks.
+ def cachedBlocks: collection.Set[BlockId] = _cachedBlocks
+
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 1683576067..376e9eb488 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -42,7 +42,6 @@ private[spark] object BlockManagerMessages {
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
extends ToBlockManagerSlave
-
//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
//////////////////////////////////////////////////////////////////////////////////
@@ -108,4 +107,6 @@ private[spark] object BlockManagerMessages {
extends ToBlockManagerMaster
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+ case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 3a48da4592..9667cebe0b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1202,6 +1202,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.dynamicAllocation.cachedExecutorIdleTimeout</code></td>
+ <td>2 * executorIdleTimeout</td>
+ <td>
+ If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration,
+ the executor will be removed. For more details, see this
+ <a href="job-scheduling.html#resource-allocation-policy">description</a>.
+ </td>
+</tr>
+<tr>
<td><code>spark.dynamicAllocation.initialExecutors</code></td>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>