diff options
Diffstat (limited to 'core/src/test')
4 files changed, 171 insertions, 239 deletions
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala deleted file mode 100644 index ffc02bcb01..0000000000 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.mockito.Mockito._ -import org.scalatest.BeforeAndAfter -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.executor.{DataReadMethod, TaskMetrics} -import org.apache.spark.rdd.RDD -import org.apache.spark.storage._ - -// TODO: Test the CacheManager's thread-safety aspects -class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter - with MockitoSugar { - - var blockManager: BlockManager = _ - var cacheManager: CacheManager = _ - var split: Partition = _ - /** An RDD which returns the values [1, 2, 3, 4]. */ - var rdd: RDD[Int] = _ - var rdd2: RDD[Int] = _ - var rdd3: RDD[Int] = _ - - before { - sc = new SparkContext("local", "test") - blockManager = mock[BlockManager] - cacheManager = new CacheManager(blockManager) - split = new Partition { override def index: Int = 0 } - rdd = new RDD[Int](sc, Nil) { - override def getPartitions: Array[Partition] = Array(split) - override val getDependencies = List[Dependency[_]]() - override def compute(split: Partition, context: TaskContext): Iterator[Int] = - Array(1, 2, 3, 4).iterator - } - rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) { - override def getPartitions: Array[Partition] = firstParent[Int].partitions - override def compute(split: Partition, context: TaskContext): Iterator[Int] = - firstParent[Int].iterator(split, context) - }.cache() - rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) { - override def getPartitions: Array[Partition] = firstParent[Int].partitions - override def compute(split: Partition, context: TaskContext): Iterator[Int] = - firstParent[Int].iterator(split, context) - }.cache() - } - - test("get uncached rdd") { - // Do not mock this test, because attempting to match Array[Any], which is not covariant, - // in blockManager.put is a losing battle. You have been warned. - blockManager = sc.env.blockManager - cacheManager = sc.env.cacheManager - val context = TaskContext.empty() - val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - val getValue = blockManager.get(RDDBlockId(rdd.id, split.index)) - assert(computeValue.toList === List(1, 2, 3, 4)) - assert(getValue.isDefined, "Block cached from getOrCompute is not found!") - assert(getValue.get.data.toList === List(1, 2, 3, 4)) - } - - test("get cached rdd") { - val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) - when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result)) - - val context = TaskContext.empty() - val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - assert(value.toList === List(5, 6, 7)) - } - - test("verify task metrics updated correctly") { - cacheManager = sc.env.cacheManager - val context = TaskContext.empty() - try { - TaskContext.setTaskContext(context) - sc.env.blockManager.registerTask(0) - cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) - assert(context.taskMetrics.updatedBlockStatuses.size === 2) - } finally { - TaskContext.unset() - } - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index 662b18f667..fe83fc722a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -80,10 +80,18 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { withTaskId(1) { assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo)) assert(blockInfoManager.get("block").get eq blockInfo) - assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) - assert(blockInfoManager.get("block").get eq blockInfo) assert(blockInfo.readerCount === 0) assert(blockInfo.writerTask === 1) + // Downgrade lock so that second call doesn't block: + blockInfoManager.downgradeLock("block") + assert(blockInfo.readerCount === 1) + assert(blockInfo.writerTask === BlockInfo.NO_WRITER) + assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) + assert(blockInfo.readerCount === 2) + assert(blockInfoManager.get("block").get eq blockInfo) + assert(blockInfo.readerCount === 2) + assert(blockInfo.writerTask === BlockInfo.NO_WRITER) + blockInfoManager.unlock("block") blockInfoManager.unlock("block") assert(blockInfo.readerCount === 0) assert(blockInfo.writerTask === BlockInfo.NO_WRITER) @@ -92,6 +100,67 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1) } + test("lockNewBlockForWriting blocks while write lock is held, then returns false after release") { + withTaskId(0) { + assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) + } + val lock1Future = Future { + withTaskId(1) { + blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()) + } + } + val lock2Future = Future { + withTaskId(2) { + blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()) + } + } + Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting + withTaskId(0) { + blockInfoManager.downgradeLock("block") + } + // After downgrading to a read lock, both threads should wake up and acquire the shared + // read lock. + assert(!Await.result(lock1Future, 1.seconds)) + assert(!Await.result(lock2Future, 1.seconds)) + assert(blockInfoManager.get("block").get.readerCount === 3) + } + + test("lockNewBlockForWriting blocks while write lock is held, then returns true after removal") { + withTaskId(0) { + assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) + } + val lock1Future = Future { + withTaskId(1) { + blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()) + } + } + val lock2Future = Future { + withTaskId(2) { + blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()) + } + } + Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting + withTaskId(0) { + blockInfoManager.removeBlock("block") + } + // After removing the block, the write lock is released. Both threads should wake up but only + // one should acquire the write lock. The second thread should block until the winner of the + // write race releases its lock. + val winningFuture: Future[Boolean] = + Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds) + assert(winningFuture.value.get.get) + val winningTID = blockInfoManager.get("block").get.writerTask + assert(winningTID === 1 || winningTID === 2) + val losingFuture: Future[Boolean] = if (winningTID == 1) lock2Future else lock1Future + assert(!losingFuture.isCompleted) + // Once the writer releases its lock, the blocked future should wake up again and complete. + withTaskId(winningTID) { + blockInfoManager.unlock("block") + } + assert(!Await.result(losingFuture, 1.seconds)) + assert(blockInfoManager.get("block").get.readerCount === 1) + } + test("read locks are reentrant") { withTaskId(1) { assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index a94d8b424d..ae1faf5d98 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -190,7 +190,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = { stores.head.putSingle(blockId, new Array[Byte](blockSize), level) - stores.head.releaseLock(blockId) val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet stores.foreach { _.removeBlock(blockId) } master.removeBlock(blockId) @@ -252,7 +251,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo // Insert a block with 2x replication and return the number of copies of the block def replicateAndGetNumCopies(blockId: String): Int = { store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2) - store.releaseLock(blockId) val numLocations = master.getLocations(blockId).size allStores.foreach { _.removeBlock(blockId) } numLocations @@ -290,7 +288,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = { val storageLevel = StorageLevel(true, true, false, true, replicationFactor) initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) - initialStores.head.releaseLock(blockId) val numLocations = master.getLocations(blockId).size allStores.foreach { _.removeBlock(blockId) } numLocations @@ -358,7 +355,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo val blockId = new TestBlockId( "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) - stores(0).releaseLock(blockId) // Assert that master know two locations for the block val blockLocations = master.getLocations(blockId).map(_.executorId).toSet @@ -397,7 +393,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo (1 to 10).foreach { i => testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) - testStore.releaseLock(s"dummy-block-$i") } (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e4ab9ee0eb..89b427049b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 - store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") @@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) - store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") @@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 - store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory and memory size val memStatus = master.getMemoryStatus.head._2 @@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory. - store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) + store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = false) eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { @@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.getLocations("nonrddblock") should have size (1) } - store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = true) store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) master.getLocations(rdd(0, 0)) should have size 0 @@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // insert broadcast blocks in both the stores Seq(driverStore, executorStore).foreach { case s => - s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY) - s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY) - s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY) - s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY) + s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY) + s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY) + s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY) + s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY) } // verify whether the blocks exist in both the stores @@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(2000) val a1 = new Array[Byte](400) - store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") assert(master.getLocations("a1").size > 0, "master was not told about a1") @@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) - store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.getLocations("a1").size > 0, "master was not told about a1") master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister() assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") @@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) val t1 = new Thread { override def run() { - store.putIteratorAndReleaseLock( + store.putIterator( "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } } val t2 = new Thread { override def run() { - store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) } } val t3 = new Thread { @@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500)) val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray) val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray) - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) val list1Get = store.get("list1") assert(list1Get.isDefined, "list1 expected to be in store") @@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store2 = makeBlockManager(8000, "executor2") store3 = makeBlockManager(8000, "executor3") val list1 = List(new Array[Byte](4000)) - store2.putIteratorAndReleaseLock( + store2.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store3.putIteratorAndReleaseLock( + store3.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") store2.stop() @@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) - store.putSingleAndReleaseLock("a1", a1, storageLevel) - store.putSingleAndReleaseLock("a2", a2, storageLevel) - store.putSingleAndReleaseLock("a3", a3, storageLevel) + store.putSingle("a1", a1, storageLevel) + store.putSingle("a2", a2, storageLevel) + store.putSingle("a3", a3, storageLevel) assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 - store.putSingleAndReleaseLock("a1", a1, storageLevel) + store.putSingle("a1", a1, storageLevel) assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store") @@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) - store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 // from the same RDD assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") @@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("in-memory LRU for partitions of multiple RDDs") { store = makeBlockManager(12000) - store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // At this point rdd_1_1 should've replaced rdd_0_1 assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") @@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Do a get() on rdd_0_2 so that it is the most recently used item assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") // Put in more partitions from RDD 0; they should replace rdd_1_1 - store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped // when we try to add rdd_0_4. assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store") @@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY) - store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY) - store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY) + store.putSingle("a1", a1, StorageLevel.DISK_ONLY) + store.putSingle("a2", a2, StorageLevel.DISK_ONLY) + store.putSingle("a3", a3, StorageLevel.DISK_ONLY) assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store") assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store") assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store") @@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) - store.putSingleAndReleaseLock("a1", a1, storageLevel) - store.putSingleAndReleaseLock("a2", a2, storageLevel) - store.putSingleAndReleaseLock("a3", a3, storageLevel) + store.putSingle("a1", a1, storageLevel) + store.putSingle("a2", a2, storageLevel) + store.putSingle("a3", a3, storageLevel) assert(accessMethod(store)("a2").isDefined, "a2 was not in store") assert(accessMethod(store)("a3").isDefined, "a3 was not in store") assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store") @@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) val a4 = new Array[Byte](4000) // First store a1 and a2, both in memory, and a3, on disk only - store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER) - store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER) - store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a3", a3, StorageLevel.DISK_ONLY) // At this point LRU should not kick in because a3 is only on disk assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") // Now let's add in a4, which uses both disk and memory; a1 should drop out - store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store") assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") @@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) @@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") assert(store.get("list1").get.data.size === 2) @@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) val list4 = List(new Array[Byte](2000), new Array[Byte](2000)) // First store list1 and list2, both in memory, and list3, on disk only - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) val listForSizeEstimate = new ArrayBuffer[Any] listForSizeEstimate ++= list1.iterator @@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.putIteratorAndReleaseLock( + store.putIterator( "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.getAndReleaseLock("list1") === None, "list1 was in store") assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") @@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("overly large block") { store = makeBlockManager(5000) - store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) + store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") - store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") } @@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE try { conf.set("spark.shuffle.compress", "true") store = makeBlockManager(20000, "exec1") - store.putSingleAndReleaseLock( + store.putSingle( ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") @@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.shuffle.compress", "false") store = makeBlockManager(20000, "exec2") - store.putSingleAndReleaseLock( + store.putSingle( ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000, "shuffle_0_0_0 was compressed") @@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.broadcast.compress", "true") store = makeBlockManager(20000, "exec3") - store.putSingleAndReleaseLock( + store.putSingle( BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000, "broadcast_0 was not compressed") @@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.broadcast.compress", "false") store = makeBlockManager(20000, "exec4") - store.putSingleAndReleaseLock( + store.putSingle( BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed") store.stop() @@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.rdd.compress", "true") store = makeBlockManager(20000, "exec5") - store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed") store.stop() store = null conf.set("spark.rdd.compress", "false") store = makeBlockManager(20000, "exec6") - store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed store = makeBlockManager(20000, "exec7") - store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) + store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed") store.stop() store = null @@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE class UnserializableClass val a1 = new UnserializableClass intercept[java.io.NotSerializableException] { - store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY) + store.putSingle("a1", a1, StorageLevel.DISK_ONLY) } // Make sure get a1 doesn't hang and returns None. @@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 1 updated block (i.e. list1) val updatedBlocks1 = getUpdatedBlocks { - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks1.size === 1) @@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 1 updated block (i.e. list2) val updatedBlocks2 = getUpdatedBlocks { - store.putIteratorAndReleaseLock( + store.putIterator( "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) } assert(updatedBlocks2.size === 1) @@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 2 updated blocks - list1 is kicked out of memory while list3 is added val updatedBlocks3 = getUpdatedBlocks { - store.putIteratorAndReleaseLock( + store.putIterator( "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks3.size === 2) @@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added val updatedBlocks4 = getUpdatedBlocks { - store.putIteratorAndReleaseLock( + store.putIterator( "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks4.size === 2) @@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // No updated blocks - list5 is too big to fit in store and nothing is kicked out val updatedBlocks5 = getUpdatedBlocks { - store.putIteratorAndReleaseLock( + store.putIterator( "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks5.size === 0) @@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list = List.fill(2)(new Array[Byte](2000)) // Tell master. By LRU, only list2 and list3 remains. - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) // getLocations and getBlockStatus should yield the same locations @@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1) // This time don't tell master and see what happens. By LRU, only list5 and list6 remains. - store.putIteratorAndReleaseLock( + store.putIterator( "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) - store.putIteratorAndReleaseLock( + store.putIterator( "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - store.putIteratorAndReleaseLock( + store.putIterator( "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) // getLocations should return nothing because the master is not informed @@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list = List.fill(2)(new Array[Byte](100)) // insert some blocks - store.putIteratorAndReleaseLock( + store.putIterator( "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) // getLocations and getBlockStatus should yield the same locations @@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE === 1) // insert some more blocks - store.putIteratorAndReleaseLock( + store.putIterator( "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIteratorAndReleaseLock( + store.putIterator( "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - store.putIteratorAndReleaseLock( + store.putIterator( "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) // getLocations and getBlockStatus should yield the same locations @@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) blockIds.foreach { blockId => - store.putIteratorAndReleaseLock( + store.putIterator( blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } val matchedBlockIds = store.master.getMatchingBlockIds(_ match { @@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { store = makeBlockManager(12000) - store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Access rdd_1_0 to ensure it's not least recently used. assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") // According to the same-RDD rule, rdd_1_0 should be replaced here. - store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // rdd_1_0 should have been replaced, even it's not least recently used. assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") @@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE memoryStore.releasePendingUnrollMemoryForThisTask() // Unroll with not enough space. This should succeed after kicking out someBlock1. - store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) - store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) @@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. - store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator @@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // would not know how to drop them from memory later. memoryStore.remove("b1") memoryStore.remove("b2") - store.putIteratorAndReleaseLock("b1", smallIterator, memOnly) - store.putIteratorAndReleaseLock("b2", smallIterator, memOnly) + store.putIterator("b1", smallIterator, memOnly) + store.putIterator("b2", smallIterator, memOnly) // Unroll with not enough space. This should succeed but kick out b1 in the process. val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) @@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.contains("b3")) assert(memoryStore.currentUnrollMemoryForThisTask === 0) memoryStore.remove("b3") - store.putIteratorAndReleaseLock("b3", smallIterator, memOnly) + store.putIterator("b3", smallIterator, memOnly) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) @@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk) - store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk) + store.putIterator("b1", smallIterator, memAndDisk) + store.putIterator("b2", smallIterator, memAndDisk) // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 @@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!diskStore.contains("b2")) assert(!diskStore.contains("b3")) memoryStore.remove("b3") - store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY) + store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll huge block with not enough space. This should fail and drop the new block to disk @@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(12000) val arr = new Array[Byte](4000) // First store a1 and a2, both in memory, and a3, on disk only - store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER) - store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER) assert(store.getSingle("a1").isDefined, "a1 was not in store") assert(store.getSingle("a2").isDefined, "a2 was not in store") // This put should fail because both a1 and a2 should be read-locked: - store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER) assert(store.getSingle("a3").isEmpty, "a3 was in store") assert(store.getSingle("a1").isDefined, "a1 was not in store") assert(store.getSingle("a2").isDefined, "a2 was not in store") @@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.releaseLock("a2") // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead: - store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER) assert(store.getSingle("a2").isEmpty, "a2 was in store") assert(store.getSingle("a1").isDefined, "a1 was not in store") assert(store.getSingle("a3").isDefined, "a3 was not in store") @@ -1335,41 +1335,6 @@ private object BlockManagerSuite { private implicit class BlockManagerTestUtils(store: BlockManager) { - def putSingleAndReleaseLock( - block: BlockId, - value: Any, - storageLevel: StorageLevel, - tellMaster: Boolean): Unit = { - if (store.putSingle(block, value, storageLevel, tellMaster)) { - store.releaseLock(block) - } - } - - def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = { - if (store.putSingle(block, value, storageLevel)) { - store.releaseLock(block) - } - } - - def putIteratorAndReleaseLock( - blockId: BlockId, - values: Iterator[Any], - level: StorageLevel): Unit = { - if (store.putIterator(blockId, values, level)) { - store.releaseLock(blockId) - } - } - - def putIteratorAndReleaseLock( - blockId: BlockId, - values: Iterator[Any], - level: StorageLevel, - tellMaster: Boolean): Unit = { - if (store.putIterator(blockId, values, level, tellMaster)) { - store.releaseLock(blockId) - } - } - def dropFromMemoryIfExists( blockId: BlockId, data: () => Either[Array[Any], ByteBuffer]): Unit = { |