aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala6
2 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 490d45d12b..3db59837fb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -371,6 +371,12 @@ private[storage] class BlockInfoManager extends Logging {
blocksWithReleasedLocks
}
+ /** Returns the number of locks held by the given task. Used only for testing. */
+ private[storage] def getTaskLockCount(taskAttemptId: TaskAttemptId): Int = {
+ readLocksByTask.get(taskAttemptId).map(_.size()).getOrElse(0) +
+ writeLocksByTask.get(taskAttemptId).map(_.size).getOrElse(0)
+ }
+
/**
* Returns the number of blocks tracked.
*/
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 245d94ac4f..991346a40a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1187,7 +1187,7 @@ private[spark] class BlockManager(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
- logInfo(s"Pro-actively replicating $blockId")
+ logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
@@ -1196,9 +1196,13 @@ private[spark] class BlockManager(
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
+ // we know we are called as a result of an executor removal, so we refresh peer cache
+ // this way, we won't try to replicate to a missing executor with a stale reference
+ getPeers(forceFetch = true)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
} finally {
+ logDebug(s"Releasing lock for $blockId")
releaseLock(blockId)
}
}