aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala34
1 files changed, 34 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index fdf28b7dcb..6d53d2e5f0 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -861,6 +861,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
memoryManager.setMemoryStore(store.memoryStore)
+ store.initialize("app-id")
// The put should fail since a1 is not serializable.
class UnserializableClass
@@ -1206,6 +1207,39 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
verify(mockBlockManagerMaster, times(2)).getLocations("item")
}
+ test("SPARK-17484: block status is properly updated following an exception in put()") {
+ val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) {
+ override def uploadBlock(
+ hostname: String,
+ port: Int, execId: String,
+ blockId: BlockId,
+ blockData: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
+ throw new InterruptedException("Intentional interrupt")
+ }
+ }
+ store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
+ store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService))
+ intercept[InterruptedException] {
+ store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true)
+ }
+ assert(store.getLocalBytes("item").isEmpty)
+ assert(master.getLocations("item").isEmpty)
+ assert(store2.getRemoteBytes("item").isEmpty)
+ }
+
+ test("SPARK-17484: master block locations are updated following an invalid remote block fetch") {
+ store = makeBlockManager(8000, "executor1")
+ store2 = makeBlockManager(8000, "executor2")
+ store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(master.getLocations("item").nonEmpty)
+ store.removeBlock("item", tellMaster = false)
+ assert(master.getLocations("item").nonEmpty)
+ assert(store2.getRemoteBytes("item").isEmpty)
+ assert(master.getLocations("item").isEmpty)
+ }
+
class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0