aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala20
1 files changed, 16 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 75dc04038d..d907add920 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -374,7 +374,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Put the block into one of the stores
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
- stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+ val testValue = Array.fill[Byte](blockSize)(1)
+ stores(0).putSingle(blockId, testValue, storageLevel)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -386,12 +387,23 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
testStore => blockLocations.contains(testStore.blockManagerId.executorId)
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
- assert(
- testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
- testStore.releaseLock(blockId)
+ val blockResultOpt = testStore.getLocalValues(blockId)
+ assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName")
+ val localValues = blockResultOpt.get.data.toSeq
+ assert(localValues.size == 1)
+ assert(localValues.head === testValue)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")
+ val memoryStore = testStore.memoryStore
+ if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
+ memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
+ assert(storageLevel.useOffHeap == byteBuffer.isDirect,
+ s"memory mode ${storageLevel.memoryMode} is not compatible with " +
+ byteBuffer.getClass.getSimpleName)
+ }
+ }
+
val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
// Assert that block status in the master for this store has expected storage level