aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-29 23:17:53 -0700
committerReynold Xin <rxin@apache.org>2014-09-29 23:17:53 -0700
commitde700d31778eb68807183cf32be8034abdc0120e (patch)
tree64f90119df0f805743c227d7b9ad8ba7121169d1
parent6b79bfb42580b6bd4c4cd99fb521534a94150693 (diff)
downloadspark-de700d31778eb68807183cf32be8034abdc0120e.tar.gz
spark-de700d31778eb68807183cf32be8034abdc0120e.tar.bz2
spark-de700d31778eb68807183cf32be8034abdc0120e.zip
[SPARK-3709] Executors don't always report broadcast block removal properly back to the driver
The problem was that the 2nd argument in RemoveBroadcast is not tellMaster! It is "removeFromDriver". Basically when removeFromDriver is not true, we don't report broadcast block removal back to the driver, and then other executors mistakenly think that the executor would still have the block, and try to fetch from it. cc @tdas Author: Reynold Xin <rxin@apache.org> Closes #2588 from rxin/debug and squashes the following commits: 6dab2e3 [Reynold Xin] Don't log random messages. f430686 [Reynold Xin] Always report broadcast removal back to master. 2a13f70 [Reynold Xin] iii
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala4
2 files changed, 3 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index 59958ee894..b389b9a202 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -200,6 +200,6 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
val buffer = blockDataManager.getBlockData(blockId).orNull
logDebug("GetBlock " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs)
+ " and got buffer " + buffer)
- buffer.nioByteBuffer()
+ if (buffer == null) null else buffer.nioByteBuffer()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
index 14ae2f38c5..8462871e79 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
@@ -58,9 +58,9 @@ class BlockManagerSlaveActor(
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
}
- case RemoveBroadcast(broadcastId, tellMaster) =>
+ case RemoveBroadcast(broadcastId, _) =>
doAsync[Int]("removing broadcast " + broadcastId, sender) {
- blockManager.removeBroadcast(broadcastId, tellMaster)
+ blockManager.removeBroadcast(broadcastId, tellMaster = true)
}
case GetBlockStatus(blockId, _) =>