aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-09-15 11:54:17 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-15 11:54:17 -0700
commit1202075c95eabba0ffebc170077df798f271a139 (patch)
tree6eb8e585de949f3a257f924e479af6563cb730cb /core/src/test/scala
parenta6b8182006d0c3dda67c06861067ca78383ecf1b (diff)
downloadspark-1202075c95eabba0ffebc170077df798f271a139.tar.gz
spark-1202075c95eabba0ffebc170077df798f271a139.tar.bz2
spark-1202075c95eabba0ffebc170077df798f271a139.zip
[SPARK-17484] Prevent invalid block locations from being reported after put() exceptions
## What changes were proposed in this pull request? If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before #15037 was fixed. This patch addresses this issue via multiple small changes: - The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup). - When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present. - Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls. This patch depends upon the refactorings in #15036, so that other patch will also have to be backported when backporting this fix. For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484). ## How was this patch tested? Two new regression tests in BlockManagerSuite. Author: Josh Rosen <joshrosen@databricks.com> Closes #15085 from JoshRosen/SPARK-17484.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala34
1 files changed, 34 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index fdf28b7dcb..6d53d2e5f0 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -861,6 +861,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
memoryManager.setMemoryStore(store.memoryStore)
+ store.initialize("app-id")
// The put should fail since a1 is not serializable.
class UnserializableClass
@@ -1206,6 +1207,39 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
verify(mockBlockManagerMaster, times(2)).getLocations("item")
}
+ test("SPARK-17484: block status is properly updated following an exception in put()") {
+ val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) {
+ override def uploadBlock(
+ hostname: String,
+ port: Int, execId: String,
+ blockId: BlockId,
+ blockData: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
+ throw new InterruptedException("Intentional interrupt")
+ }
+ }
+ store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
+ store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService))
+ intercept[InterruptedException] {
+ store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true)
+ }
+ assert(store.getLocalBytes("item").isEmpty)
+ assert(master.getLocations("item").isEmpty)
+ assert(store2.getRemoteBytes("item").isEmpty)
+ }
+
+ test("SPARK-17484: master block locations are updated following an invalid remote block fetch") {
+ store = makeBlockManager(8000, "executor1")
+ store2 = makeBlockManager(8000, "executor2")
+ store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(master.getLocations("item").nonEmpty)
+ store.removeBlock("item", tellMaster = false)
+ assert(master.getLocations("item").nonEmpty)
+ assert(store2.getRemoteBytes("item").isEmpty)
+ assert(master.getLocations("item").isEmpty)
+ }
+
class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0