From de700d31778eb68807183cf32be8034abdc0120e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 29 Sep 2014 23:17:53 -0700 Subject: [SPARK-3709] Executors don't always report broadcast block removal properly back to the driver The problem was that the 2nd argument in RemoveBroadcast is not tellMaster! It is "removeFromDriver". Basically when removeFromDriver is not true, we don't report broadcast block removal back to the driver, and then other executors mistakenly think that the executor would still have the block, and try to fetch from it. cc @tdas Author: Reynold Xin Closes #2588 from rxin/debug and squashes the following commits: 6dab2e3 [Reynold Xin] Don't log random messages. f430686 [Reynold Xin] Always report broadcast removal back to master. 2a13f70 [Reynold Xin] iii --- .../scala/org/apache/spark/network/nio/NioBlockTransferService.scala | 2 +- .../main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 59958ee894..b389b9a202 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -200,6 +200,6 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa val buffer = blockDataManager.getBlockData(blockId).orNull logDebug("GetBlock " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs) + " and got buffer " + buffer) - buffer.nioByteBuffer() + if (buffer == null) null else buffer.nioByteBuffer() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index 14ae2f38c5..8462871e79 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -58,9 +58,9 @@ class BlockManagerSlaveActor( SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } - case RemoveBroadcast(broadcastId, tellMaster) => + case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, sender) { - blockManager.removeBroadcast(broadcastId, tellMaster) + blockManager.removeBroadcast(broadcastId, tellMaster = true) } case GetBlockStatus(blockId, _) => -- cgit v1.2.3