aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorShubham Chopra <schopra31@bloomberg.net>2017-03-28 09:47:29 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-28 09:47:29 +0800
commita250933c625ed720d15a0e479e9c51113605b102 (patch)
tree00ed643ec67b61160c484f99dd1ea8907ad609df /core/src/main/scala/org/apache
parent1d00761b9176a1f42976057ca78638c5b0763abc (diff)
downloadspark-a250933c625ed720d15a0e479e9c51113605b102.tar.gz
spark-a250933c625ed720d15a0e479e9c51113605b102.tar.bz2
spark-a250933c625ed720d15a0e479e9c51113605b102.zip
[SPARK-19803][CORE][TEST] Proactive replication test failures
## What changes were proposed in this pull request? Executors cache a list of their peers that is refreshed by default every minute. The cached stale references were randomly being used for replication. Since those executors were removed from the master, they did not occur in the block locations as reported by the master. This was fixed by 1. Refreshing peer cache in the block manager before trying to pro-actively replicate. This way the probability of replicating to a failed executor is eliminated. 2. Explicitly stopping the block manager in the tests. This shuts down the RPC endpoint use by the block manager. This way, even if a block manager tries to replicate using a stale reference, the replication logic should take care of refreshing the list of peers after failure. ## How was this patch tested? Tested manually Author: Shubham Chopra <schopra31@bloomberg.net> Author: Kay Ousterhout <kayousterhout@gmail.com> Author: Shubham Chopra <shubhamchopra@users.noreply.github.com> Closes #17325 from shubhamchopra/SPARK-19803.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala6
2 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 490d45d12b..3db59837fb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -371,6 +371,12 @@ private[storage] class BlockInfoManager extends Logging {
blocksWithReleasedLocks
}
+ /** Returns the number of locks held by the given task. Used only for testing. */
+ private[storage] def getTaskLockCount(taskAttemptId: TaskAttemptId): Int = {
+ readLocksByTask.get(taskAttemptId).map(_.size()).getOrElse(0) +
+ writeLocksByTask.get(taskAttemptId).map(_.size).getOrElse(0)
+ }
+
/**
* Returns the number of blocks tracked.
*/
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 245d94ac4f..991346a40a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1187,7 +1187,7 @@ private[spark] class BlockManager(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
- logInfo(s"Pro-actively replicating $blockId")
+ logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
@@ -1196,9 +1196,13 @@ private[spark] class BlockManager(
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
+ // we know we are called as a result of an executor removal, so we refresh peer cache
+ // this way, we won't try to replicate to a missing executor with a stale reference
+ getPeers(forceFetch = true)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
} finally {
+ logDebug(s"Releasing lock for $blockId")
releaseLock(blockId)
}
}