diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 43e832dc02..145d432afe 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag import org.scalatest._ import org.apache.spark._ -import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} import org.apache.spark.util._ @@ -86,7 +86,7 @@ class MemoryStoreSuite assert(memoryStore.currentUnrollMemoryForThisTask === 0) def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { - memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory) + memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory, MemoryMode.ON_HEAP) } // Reserve @@ -99,9 +99,9 @@ class MemoryStoreSuite assert(!reserveUnrollMemoryForThisTask(1000000)) assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted // Release - memoryStore.releaseUnrollMemoryForThisTask(100) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100) assert(memoryStore.currentUnrollMemoryForThisTask === 700) - memoryStore.releaseUnrollMemoryForThisTask(100) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100) assert(memoryStore.currentUnrollMemoryForThisTask === 600) // Reserve again assert(reserveUnrollMemoryForThisTask(4400)) @@ -109,9 +109,9 @@ class MemoryStoreSuite assert(!reserveUnrollMemoryForThisTask(20000)) assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted // Release again - memoryStore.releaseUnrollMemoryForThisTask(1000) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 1000) assert(memoryStore.currentUnrollMemoryForThisTask === 4000) - memoryStore.releaseUnrollMemoryForThisTask() // release all + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) // release all assert(memoryStore.currentUnrollMemoryForThisTask === 0) } @@ -254,7 +254,7 @@ class MemoryStoreSuite assert(blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false))) - val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag) + val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag, MemoryMode.ON_HEAP) blockInfoManager.unlock(blockId) res } @@ -312,7 +312,7 @@ class MemoryStoreSuite assert(blockInfoManager.lockNewBlockForWriting( "b1", new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) - val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, MemoryMode.ON_HEAP) blockInfoManager.unlock("b1") assert(res.isLeft) assert(memoryStore.currentUnrollMemoryForThisTask > 0) @@ -333,7 +333,7 @@ class MemoryStoreSuite assert(blockInfoManager.lockNewBlockForWriting( "b1", new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) - val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, MemoryMode.ON_HEAP) blockInfoManager.unlock("b1") assert(res.isLeft) assert(memoryStore.currentUnrollMemoryForThisTask > 0) @@ -395,7 +395,7 @@ class MemoryStoreSuite val blockId = BlockId("rdd_3_10") blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) - memoryStore.putBytes(blockId, 13000, () => { + memoryStore.putBytes(blockId, 13000, MemoryMode.ON_HEAP, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) } @@ -404,7 +404,7 @@ class MemoryStoreSuite val (memoryStore, _) = makeMemoryStore(12000) val blockId = BlockId("rdd_3_10") var bytes: ChunkedByteBuffer = null - memoryStore.putBytes(blockId, 10000, () => { + memoryStore.putBytes(blockId, 10000, MemoryMode.ON_HEAP, () => { bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) bytes }) |