aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala97
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala73
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala235
4 files changed, 171 insertions, 239 deletions
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
deleted file mode 100644
index ffc02bcb01..0000000000
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-
-// TODO: Test the CacheManager's thread-safety aspects
-class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter
- with MockitoSugar {
-
- var blockManager: BlockManager = _
- var cacheManager: CacheManager = _
- var split: Partition = _
- /** An RDD which returns the values [1, 2, 3, 4]. */
- var rdd: RDD[Int] = _
- var rdd2: RDD[Int] = _
- var rdd3: RDD[Int] = _
-
- before {
- sc = new SparkContext("local", "test")
- blockManager = mock[BlockManager]
- cacheManager = new CacheManager(blockManager)
- split = new Partition { override def index: Int = 0 }
- rdd = new RDD[Int](sc, Nil) {
- override def getPartitions: Array[Partition] = Array(split)
- override val getDependencies = List[Dependency[_]]()
- override def compute(split: Partition, context: TaskContext): Iterator[Int] =
- Array(1, 2, 3, 4).iterator
- }
- rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
- override def getPartitions: Array[Partition] = firstParent[Int].partitions
- override def compute(split: Partition, context: TaskContext): Iterator[Int] =
- firstParent[Int].iterator(split, context)
- }.cache()
- rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
- override def getPartitions: Array[Partition] = firstParent[Int].partitions
- override def compute(split: Partition, context: TaskContext): Iterator[Int] =
- firstParent[Int].iterator(split, context)
- }.cache()
- }
-
- test("get uncached rdd") {
- // Do not mock this test, because attempting to match Array[Any], which is not covariant,
- // in blockManager.put is a losing battle. You have been warned.
- blockManager = sc.env.blockManager
- cacheManager = sc.env.cacheManager
- val context = TaskContext.empty()
- val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
- assert(computeValue.toList === List(1, 2, 3, 4))
- assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
- assert(getValue.get.data.toList === List(1, 2, 3, 4))
- }
-
- test("get cached rdd") {
- val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
- when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
-
- val context = TaskContext.empty()
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(5, 6, 7))
- }
-
- test("verify task metrics updated correctly") {
- cacheManager = sc.env.cacheManager
- val context = TaskContext.empty()
- try {
- TaskContext.setTaskContext(context)
- sc.env.blockManager.registerTask(0)
- cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
- assert(context.taskMetrics.updatedBlockStatuses.size === 2)
- } finally {
- TaskContext.unset()
- }
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 662b18f667..fe83fc722a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -80,10 +80,18 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
assert(blockInfoManager.get("block").get eq blockInfo)
- assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
- assert(blockInfoManager.get("block").get eq blockInfo)
assert(blockInfo.readerCount === 0)
assert(blockInfo.writerTask === 1)
+ // Downgrade lock so that second call doesn't block:
+ blockInfoManager.downgradeLock("block")
+ assert(blockInfo.readerCount === 1)
+ assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+ assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ assert(blockInfo.readerCount === 2)
+ assert(blockInfoManager.get("block").get eq blockInfo)
+ assert(blockInfo.readerCount === 2)
+ assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+ blockInfoManager.unlock("block")
blockInfoManager.unlock("block")
assert(blockInfo.readerCount === 0)
assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
@@ -92,6 +100,67 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
}
+ test("lockNewBlockForWriting blocks while write lock is held, then returns false after release") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val lock1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ val lock2Future = Future {
+ withTaskId(2) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.downgradeLock("block")
+ }
+ // After downgrading to a read lock, both threads should wake up and acquire the shared
+ // read lock.
+ assert(!Await.result(lock1Future, 1.seconds))
+ assert(!Await.result(lock2Future, 1.seconds))
+ assert(blockInfoManager.get("block").get.readerCount === 3)
+ }
+
+ test("lockNewBlockForWriting blocks while write lock is held, then returns true after removal") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val lock1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ val lock2Future = Future {
+ withTaskId(2) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.removeBlock("block")
+ }
+ // After removing the block, the write lock is released. Both threads should wake up but only
+ // one should acquire the write lock. The second thread should block until the winner of the
+ // write race releases its lock.
+ val winningFuture: Future[Boolean] =
+ Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
+ assert(winningFuture.value.get.get)
+ val winningTID = blockInfoManager.get("block").get.writerTask
+ assert(winningTID === 1 || winningTID === 2)
+ val losingFuture: Future[Boolean] = if (winningTID == 1) lock2Future else lock1Future
+ assert(!losingFuture.isCompleted)
+ // Once the writer releases its lock, the blocked future should wake up again and complete.
+ withTaskId(winningTID) {
+ blockInfoManager.unlock("block")
+ }
+ assert(!Await.result(losingFuture, 1.seconds))
+ assert(blockInfoManager.get("block").get.readerCount === 1)
+ }
+
test("read locks are reentrant") {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index a94d8b424d..ae1faf5d98 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,7 +190,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
- stores.head.releaseLock(blockId)
val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
stores.foreach { _.removeBlock(blockId) }
master.removeBlock(blockId)
@@ -252,7 +251,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// Insert a block with 2x replication and return the number of copies of the block
def replicateAndGetNumCopies(blockId: String): Int = {
store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
- store.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@@ -290,7 +288,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
- initialStores.head.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@@ -358,7 +355,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
- stores(0).releaseLock(blockId)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -397,7 +393,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
(1 to 10).foreach {
i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
- testStore.releaseLock(s"dummy-block-$i")
}
(1 to 10).foreach {
i => testStore.removeBlock(s"dummy-block-$i")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e4ab9ee0eb..89b427049b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
@@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
@@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory.
- store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.getLocations("nonrddblock") should have size (1)
}
- store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = true)
store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
@@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// insert broadcast blocks in both the stores
Seq(driverStore, executorStore).foreach { case s =>
- s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
- s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
- s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
- s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
}
// verify whether the blocks exist in both the stores
@@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
override def run() {
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
}
}
val t3 = new Thread {
@@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
@@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store2 = makeBlockManager(8000, "executor2")
store3 = makeBlockManager(8000, "executor3")
val list1 = List(new Array[Byte](4000))
- store2.putIteratorAndReleaseLock(
+ store2.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store3.putIteratorAndReleaseLock(
+ store3.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store2.stop()
@@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingleAndReleaseLock("a1", a1, storageLevel)
- store.putSingleAndReleaseLock("a2", a2, storageLevel)
- store.putSingleAndReleaseLock("a3", a3, storageLevel)
+ store.putSingle("a1", a1, storageLevel)
+ store.putSingle("a2", a2, storageLevel)
+ store.putSingle("a3", a3, storageLevel)
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingleAndReleaseLock("a1", a1, storageLevel)
+ store.putSingle("a1", a1, storageLevel)
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
@@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
@@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("in-memory LRU for partitions of multiple RDDs") {
store = makeBlockManager(12000)
- store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Do a get() on rdd_0_2 so that it is the most recently used item
assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
- store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
- store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+ store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
@@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingleAndReleaseLock("a1", a1, storageLevel)
- store.putSingleAndReleaseLock("a2", a2, storageLevel)
- store.putSingleAndReleaseLock("a3", a3, storageLevel)
+ store.putSingle("a1", a1, storageLevel)
+ store.putSingle("a2", a2, storageLevel)
+ store.putSingle("a3", a3, storageLevel)
assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
val a4 = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
- store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
- store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
@@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
@@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
@@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
// First store list1 and list2, both in memory, and list3, on disk only
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val listForSizeEstimate = new ArrayBuffer[Any]
listForSizeEstimate ++= list1.iterator
@@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
@@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("overly large block") {
store = makeBlockManager(5000)
- store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
- store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
}
@@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
try {
conf.set("spark.shuffle.compress", "true")
store = makeBlockManager(20000, "exec1")
- store.putSingleAndReleaseLock(
+ store.putSingle(
ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
@@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.shuffle.compress", "false")
store = makeBlockManager(20000, "exec2")
- store.putSingleAndReleaseLock(
+ store.putSingle(
ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
"shuffle_0_0_0 was compressed")
@@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "true")
store = makeBlockManager(20000, "exec3")
- store.putSingleAndReleaseLock(
+ store.putSingle(
BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
"broadcast_0 was not compressed")
@@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "false")
store = makeBlockManager(20000, "exec4")
- store.putSingleAndReleaseLock(
+ store.putSingle(
BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
store.stop()
@@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.rdd.compress", "true")
store = makeBlockManager(20000, "exec5")
- store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "false")
store = makeBlockManager(20000, "exec6")
- store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
store = makeBlockManager(20000, "exec7")
- store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
store.stop()
store = null
@@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
class UnserializableClass
val a1 = new UnserializableClass
intercept[java.io.NotSerializableException] {
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
}
// Make sure get a1 doesn't hang and returns None.
@@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list1)
val updatedBlocks1 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks1.size === 1)
@@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list2)
val updatedBlocks2 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
assert(updatedBlocks2.size === 1)
@@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks3.size === 2)
@@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks4.size === 2)
@@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// No updated blocks - list5 is too big to fit in store and nothing is kicked out
val updatedBlocks5 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks5.size === 0)
@@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](2000))
// Tell master. By LRU, only list2 and list3 remains.
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
@@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
// getLocations should return nothing because the master is not informed
@@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](100))
// insert some blocks
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
@@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
=== 1)
// insert some more blocks
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
@@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
- store.putIteratorAndReleaseLock(
+ store.putIterator(
blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
@@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = makeBlockManager(12000)
- store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
- store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
memoryStore.releasePendingUnrollMemoryForThisTask()
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
- store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
- store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// would not know how to drop them from memory later.
memoryStore.remove("b1")
memoryStore.remove("b2")
- store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
- store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
+ store.putIterator("b1", smallIterator, memOnly)
+ store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.contains("b3"))
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
memoryStore.remove("b3")
- store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
+ store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
- store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
+ store.putIterator("b1", smallIterator, memAndDisk)
+ store.putIterator("b2", smallIterator, memAndDisk)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!diskStore.contains("b2"))
assert(!diskStore.contains("b3"))
memoryStore.remove("b3")
- store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(12000)
val arr = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
- store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
- store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
// This put should fail because both a1 and a2 should be read-locked:
- store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a3").isEmpty, "a3 was in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
@@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.releaseLock("a2")
// Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
// block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
- store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a2").isEmpty, "a2 was in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
@@ -1335,41 +1335,6 @@ private object BlockManagerSuite {
private implicit class BlockManagerTestUtils(store: BlockManager) {
- def putSingleAndReleaseLock(
- block: BlockId,
- value: Any,
- storageLevel: StorageLevel,
- tellMaster: Boolean): Unit = {
- if (store.putSingle(block, value, storageLevel, tellMaster)) {
- store.releaseLock(block)
- }
- }
-
- def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
- if (store.putSingle(block, value, storageLevel)) {
- store.releaseLock(block)
- }
- }
-
- def putIteratorAndReleaseLock(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Unit = {
- if (store.putIterator(blockId, values, level)) {
- store.releaseLock(blockId)
- }
- }
-
- def putIteratorAndReleaseLock(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- tellMaster: Boolean): Unit = {
- if (store.putIterator(blockId, values, level, tellMaster)) {
- store.releaseLock(blockId)
- }
- }
-
def dropFromMemoryIfExists(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): Unit = {