diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala | 20 |
1 files changed, 16 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 75dc04038d..d907add920 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -374,7 +374,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite // Put the block into one of the stores val blockId = new TestBlockId( "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) - stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + val testValue = Array.fill[Byte](blockSize)(1) + stores(0).putSingle(blockId, testValue, storageLevel) // Assert that master know two locations for the block val blockLocations = master.getLocations(blockId).map(_.executorId).toSet @@ -386,12 +387,23 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite testStore => blockLocations.contains(testStore.blockManagerId.executorId) }.foreach { testStore => val testStoreName = testStore.blockManagerId.executorId - assert( - testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName") - testStore.releaseLock(blockId) + val blockResultOpt = testStore.getLocalValues(blockId) + assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName") + val localValues = blockResultOpt.get.data.toSeq + assert(localValues.size == 1) + assert(localValues.head === testValue) assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), s"master does not have status for ${blockId.name} in $testStoreName") + val memoryStore = testStore.memoryStore + if (memoryStore.contains(blockId) && !storageLevel.deserialized) { + memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer => + assert(storageLevel.useOffHeap == byteBuffer.isDirect, + s"memory mode ${storageLevel.memoryMode} is not compatible with " + + byteBuffer.getClass.getSimpleName) + } + } + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) // Assert that block status in the master for this store has expected storage level |