aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorTal Sliwowicz <tal.s@taboola.com>2014-10-24 13:51:25 -0700
committerAndrew Or <andrew@databricks.com>2014-10-24 13:51:25 -0700
commit59297e9510557edd4828a3961aa3559dbeae5f30 (patch)
tree7009b01b8047d2f6a631c68212de0bef8132c0a2 /core/src/main
parent80dde80a6d4f07d521dfeb471c425a67811504d9 (diff)
downloadspark-59297e9510557edd4828a3961aa3559dbeae5f30.tar.gz
spark-59297e9510557edd4828a3961aa3559dbeae5f30.tar.bz2
spark-59297e9510557edd4828a3961aa3559dbeae5f30.zip
[SPARK-4006] In long running contexts, we encountered the situation of d...
...ouble registe... ...r without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. This is just like https://github.com/apache/spark/pull/2886 except it's on branch-1.1 Author: Tal Sliwowicz <tal.s@taboola.com> Closes #2915 from tsliwowicz/branch-1.1-block-mgr-removal and squashes the following commits: d122236 [Tal Sliwowicz] [SPARK-4006] In long running contexts, we encountered the situation of double registe...
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala25
1 files changed, 13 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 3ab07703b6..dc80148e13 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -204,6 +204,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
+ logInfo(s"Removing block manager $blockManagerId")
}
private def expireDeadHosts() {
@@ -327,20 +328,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
- case Some(manager) =>
- // A block manager of the same executor already exists.
- // This should never happen. Let's just quit.
- logError("Got two different block manager registrations on " + id.executorId)
- System.exit(1)
+ case Some(oldId) =>
+ // A block manager of the same executor already exists, so remove it (assumed dead)
+ logError("Got two different block manager registrations on same executor - "
+ + s" will replace old one $oldId with new one $id")
+ removeExecutor(id.executorId)
case None =>
- blockManagerIdByExecutor(id.executorId) = id
}
-
- logInfo("Registering block manager %s with %s RAM".format(
- id.hostPort, Utils.bytesToString(maxMemSize)))
-
- blockManagerInfo(id) =
- new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
+ logInfo("Registering block manager %s with %s RAM, %s".format(
+ id.hostPort, Utils.bytesToString(maxMemSize), id))
+
+ blockManagerIdByExecutor(id.executorId) = id
+
+ blockManagerInfo(id) = new BlockManagerInfo(
+ id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
}