diff options
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 6 |
2 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 490d45d12b..3db59837fb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -371,6 +371,12 @@ private[storage] class BlockInfoManager extends Logging { blocksWithReleasedLocks } + /** Returns the number of locks held by the given task. Used only for testing. */ + private[storage] def getTaskLockCount(taskAttemptId: TaskAttemptId): Int = { + readLocksByTask.get(taskAttemptId).map(_.size()).getOrElse(0) + + writeLocksByTask.get(taskAttemptId).map(_.size).getOrElse(0) + } + /** * Returns the number of blocks tracked. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 245d94ac4f..991346a40a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1187,7 +1187,7 @@ private[spark] class BlockManager( blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int): Unit = { - logInfo(s"Pro-actively replicating $blockId") + logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") blockInfoManager.lockForReading(blockId).foreach { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( @@ -1196,9 +1196,13 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) + // we know we are called as a result of an executor removal, so we refresh peer cache + // this way, we won't try to replicate to a missing executor with a stale reference + getPeers(forceFetch = true) try { replicate(blockId, data, storageLevel, info.classTag, existingReplicas) } finally { + logDebug(s"Releasing lock for $blockId") releaseLock(blockId) } } |