diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-23 10:15:23 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-03-23 10:15:23 -0700 |
commit | 3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch) | |
tree | 0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /core/src/test | |
parent | 6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff) | |
download | spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2 spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip |
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`.
- The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`.
- `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`.
- The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests.
- Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'core/src/test')
7 files changed, 333 insertions, 252 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 47c695ad4e..44733dcdaf 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -70,6 +70,7 @@ public class UnsafeShuffleWriterSuite { final LinkedList<File> spillFilesCreated = new LinkedList<>(); SparkConf conf; final Serializer serializer = new KryoSerializer(new SparkConf()); + final SerializerManager serializerManager = new SerializerManager(serializer, new SparkConf()); TaskMetrics taskMetrics; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @@ -111,7 +112,7 @@ public class UnsafeShuffleWriterSuite { .set("spark.memory.offHeap.enabled", "false"); taskMetrics = new TaskMetrics(); memoryManager = new TestMemoryManager(conf); - taskMemoryManager = new TaskMemoryManager(memoryManager, 0); + taskMemoryManager = new TaskMemoryManager(memoryManager, 0); when(blockManager.diskBlockManager()).thenReturn(diskBlockManager); when(blockManager.getDiskWriter( @@ -135,35 +136,6 @@ public class UnsafeShuffleWriterSuite { ); } }); - when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class))).thenAnswer( - new Answer<InputStream>() { - @Override - public InputStream answer(InvocationOnMock invocation) throws Throwable { - assertTrue(invocation.getArguments()[0] instanceof TempShuffleBlockId); - InputStream is = (InputStream) invocation.getArguments()[1]; - if (conf.getBoolean("spark.shuffle.compress", true)) { - return CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(is); - } else { - return is; - } - } - } - ); - - when(blockManager.wrapForCompression(any(BlockId.class), any(OutputStream.class))).thenAnswer( - new Answer<OutputStream>() { - @Override - public OutputStream answer(InvocationOnMock invocation) throws Throwable { - assertTrue(invocation.getArguments()[0] instanceof TempShuffleBlockId); - OutputStream os = (OutputStream) invocation.getArguments()[1]; - if (conf.getBoolean("spark.shuffle.compress", true)) { - return CompressionCodec$.MODULE$.createCodec(conf).compressedOutputStream(os); - } else { - return os; - } - } - } - ); when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); doAnswer(new Answer<Void>() { diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 6667179b9d..449fb45c30 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -19,7 +19,6 @@ package org.apache.spark.unsafe.map; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.*; @@ -42,7 +41,9 @@ import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TestMemoryManager; import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.serializer.JavaSerializer; import org.apache.spark.serializer.SerializerInstance; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; @@ -51,7 +52,6 @@ import org.apache.spark.util.Utils; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.mockito.AdditionalAnswers.returnsSecondArg; import static org.mockito.Answers.RETURNS_SMART_NULLS; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -64,6 +64,9 @@ public abstract class AbstractBytesToBytesMapSuite { private TestMemoryManager memoryManager; private TaskMemoryManager taskMemoryManager; + private SerializerManager serializerManager = new SerializerManager( + new JavaSerializer(new SparkConf()), + new SparkConf().set("spark.shuffle.spill.compress", "false")); private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes final LinkedList<File> spillFilesCreated = new LinkedList<>(); @@ -85,7 +88,9 @@ public abstract class AbstractBytesToBytesMapSuite { new TestMemoryManager( new SparkConf() .set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator()) - .set("spark.memory.offHeap.size", "256mb")); + .set("spark.memory.offHeap.size", "256mb") + .set("spark.shuffle.spill.compress", "false") + .set("spark.shuffle.compress", "false")); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test"); @@ -124,8 +129,6 @@ public abstract class AbstractBytesToBytesMapSuite { ); } }); - when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class))) - .then(returnsSecondArg()); } @After @@ -546,8 +549,8 @@ public abstract class AbstractBytesToBytesMapSuite { @Test public void spillInIterator() throws IOException { - BytesToBytesMap map = - new BytesToBytesMap(taskMemoryManager, blockManager, 1, 0.75, 1024, false); + BytesToBytesMap map = new BytesToBytesMap( + taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false); try { int i; for (i = 0; i < 1024; i++) { diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index db50e551f2..a2253d8559 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -19,7 +19,6 @@ package org.apache.spark.util.collection.unsafe.sort; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.LinkedList; @@ -43,14 +42,15 @@ import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.memory.TestMemoryManager; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.serializer.JavaSerializer; import org.apache.spark.serializer.SerializerInstance; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; -import static org.mockito.AdditionalAnswers.returnsSecondArg; import static org.mockito.Answers.RETURNS_SMART_NULLS; import static org.mockito.Mockito.*; @@ -60,6 +60,9 @@ public class UnsafeExternalSorterSuite { final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); + final SerializerManager serializerManager = new SerializerManager( + new JavaSerializer(new SparkConf()), + new SparkConf().set("spark.shuffle.spill.compress", "false")); // Use integer comparison for comparing prefixes (which are partition ids, in this case) final PrefixComparator prefixComparator = new PrefixComparator() { @Override @@ -135,8 +138,6 @@ public class UnsafeExternalSorterSuite { ); } }); - when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class))) - .then(returnsSecondArg()); } @After @@ -172,6 +173,7 @@ public class UnsafeExternalSorterSuite { return UnsafeExternalSorter.create( taskMemoryManager, blockManager, + serializerManager, taskContext, recordComparator, prefixComparator, @@ -374,6 +376,7 @@ public class UnsafeExternalSorterSuite { final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( taskMemoryManager, blockManager, + serializerManager, taskContext, null, null, @@ -408,6 +411,7 @@ public class UnsafeExternalSorterSuite { final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( taskMemoryManager, blockManager, + serializerManager, taskContext, recordComparator, prefixComparator, diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 2732cd6749..3dded4d486 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -194,10 +194,11 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val blockId = blockIds(0) val blockManager = SparkEnv.get.blockManager val blockTransfer = SparkEnv.get.blockTransferService + val serializerManager = SparkEnv.get.serializerManager blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, blockId.toString) - val deserialized = blockManager.dataDeserialize[Int](blockId, + val deserialized = serializerManager.dataDeserialize[Int](blockId, new ChunkedByteBuffer(bytes.nioByteBuffer())).toList assert(deserialized === (1 to 100).toList) } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 08f52c92e1..dba1172d5f 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -20,14 +20,11 @@ package org.apache.spark.shuffle import java.io.{ByteArrayOutputStream, InputStream} import java.nio.ByteBuffer -import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito.{mock, when} -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer import org.apache.spark._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId} /** @@ -77,13 +74,6 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // can ensure retain() and release() are properly called. val blockManager = mock(classOf[BlockManager]) - // Create a return function to use for the mocked wrapForCompression method that just returns - // the original input stream. - val dummyCompressionFunction = new Answer[InputStream] { - override def answer(invocation: InvocationOnMock): InputStream = - invocation.getArguments()(1).asInstanceOf[InputStream] - } - // Create a buffer with some randomly generated key-value pairs to use as the shuffle data // from each mappers (all mappers return the same shuffle data). val byteOutputStream = new ByteArrayOutputStream() @@ -105,9 +95,6 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // fetch shuffle data. val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) when(blockManager.getBlockData(shuffleBlockId)).thenReturn(managedBuffer) - when(blockManager.wrapForCompression(meq(shuffleBlockId), isA(classOf[InputStream]))) - .thenAnswer(dummyCompressionFunction) - managedBuffer } @@ -133,11 +120,18 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext new BaseShuffleHandle(shuffleId, numMaps, dependency) } + val serializerManager = new SerializerManager( + serializer, + new SparkConf() + .set("spark.shuffle.compress", "false") + .set("spark.shuffle.spill.compress", "false")) + val shuffleReader = new BlockStoreShuffleReader( shuffleHandle, reduceId, reduceId + 1, TaskContext.empty(), + serializerManager, blockManager, mapOutputTracker) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9419dfaa00..94f6f87740 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1033,138 +1033,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") } - test("reserve/release unroll memory") { - store = makeBlockManager(12000) - val memoryStore = store.memoryStore - assert(memoryStore.currentUnrollMemory === 0) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - - def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { - memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory) - } - - // Reserve - assert(reserveUnrollMemoryForThisTask(100)) - assert(memoryStore.currentUnrollMemoryForThisTask === 100) - assert(reserveUnrollMemoryForThisTask(200)) - assert(memoryStore.currentUnrollMemoryForThisTask === 300) - assert(reserveUnrollMemoryForThisTask(500)) - assert(memoryStore.currentUnrollMemoryForThisTask === 800) - assert(!reserveUnrollMemoryForThisTask(1000000)) - assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted - // Release - memoryStore.releaseUnrollMemoryForThisTask(100) - assert(memoryStore.currentUnrollMemoryForThisTask === 700) - memoryStore.releaseUnrollMemoryForThisTask(100) - assert(memoryStore.currentUnrollMemoryForThisTask === 600) - // Reserve again - assert(reserveUnrollMemoryForThisTask(4400)) - assert(memoryStore.currentUnrollMemoryForThisTask === 5000) - assert(!reserveUnrollMemoryForThisTask(20000)) - assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted - // Release again - memoryStore.releaseUnrollMemoryForThisTask(1000) - assert(memoryStore.currentUnrollMemoryForThisTask === 4000) - memoryStore.releaseUnrollMemoryForThisTask() // release all - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - } - - test("safely unroll blocks") { - store = makeBlockManager(12000) - val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) - val memoryStore = store.memoryStore - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - - // Unroll with all the space in the world. This should succeed. - var putResult = - memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) - assert(putResult.isRight) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => - assert(e === a, "getValues() did not return original values!") - } - assert(memoryStore.remove("unroll")) - - // Unroll with not enough space. This should succeed after kicking out someBlock1. - assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)) - assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)) - putResult = - memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) - assert(putResult.isRight) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(memoryStore.contains("someBlock2")) - assert(!memoryStore.contains("someBlock1")) - smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => - assert(e === a, "getValues() did not return original values!") - } - assert(memoryStore.remove("unroll")) - - // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = - // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. - // In the mean time, however, we kicked out someBlock2 before giving up. - assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)) - putResult = - memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) - assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator - assert(!memoryStore.contains("someBlock2")) - assert(putResult.isLeft) - bigList.iterator.zip(putResult.left.get).foreach { case (e, a) => - assert(e === a, "putIterator() did not return original values!") - } - // The unroll memory was freed once the iterator returned by putIterator() was fully traversed. - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - } - - test("safely unroll blocks through putIterator") { - store = makeBlockManager(12000) - val memOnly = StorageLevel.MEMORY_ONLY - val memoryStore = store.memoryStore - val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) - def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] - def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - - // Unroll with plenty of space. This should succeed and cache both blocks. - val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any) - val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any) - assert(memoryStore.contains("b1")) - assert(memoryStore.contains("b2")) - assert(result1.isRight) // unroll was successful - assert(result2.isRight) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - - // Re-put these two blocks so block manager knows about them too. Otherwise, block manager - // would not know how to drop them from memory later. - memoryStore.remove("b1") - memoryStore.remove("b2") - store.putIterator("b1", smallIterator, memOnly) - store.putIterator("b2", smallIterator, memOnly) - - // Unroll with not enough space. This should succeed but kick out b1 in the process. - val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any) - assert(result3.isRight) - assert(!memoryStore.contains("b1")) - assert(memoryStore.contains("b2")) - assert(memoryStore.contains("b3")) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - memoryStore.remove("b3") - store.putIterator("b3", smallIterator, memOnly) - - // Unroll huge block with not enough space. This should fail and kick out b2 in the process. - val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, ClassTag.Any) - assert(result4.isLeft) // unroll was unsuccessful - assert(!memoryStore.contains("b1")) - assert(!memoryStore.contains("b2")) - assert(memoryStore.contains("b3")) - assert(!memoryStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator - } - - /** - * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK. - */ test("safely unroll blocks through putIterator (disk)") { store = makeBlockManager(12000) val memAndDisk = StorageLevel.MEMORY_AND_DISK @@ -1203,72 +1071,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator - result4.left.get.close() - assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory - } - - test("multiple unrolls by the same thread") { - store = makeBlockManager(12000) - val memOnly = StorageLevel.MEMORY_ONLY - val memoryStore = store.memoryStore - val smallList = List.fill(40)(new Array[Byte](100)) - def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - - // All unroll memory used is released because putIterator did not return an iterator - assert(memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any).isRight) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any).isRight) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) - - // Unroll memory is not released because putIterator returned an iterator - // that still depends on the underlying vector used in the process - assert(memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any).isLeft) - val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask - assert(unrollMemoryAfterB3 > 0) - - // The unroll memory owned by this thread builds on top of its value after the previous unrolls - assert(memoryStore.putIterator("b4", smallIterator, memOnly, ClassTag.Any).isLeft) - val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask - assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) - - // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - assert(memoryStore.putIterator("b5", smallIterator, memOnly, ClassTag.Any).isLeft) - val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - assert(memoryStore.putIterator("b6", smallIterator, memOnly, ClassTag.Any).isLeft) - val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - assert(memoryStore.putIterator("b7", smallIterator, memOnly, ClassTag.Any).isLeft) - val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask - assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) - assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) - assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) - } - - test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") { - store = makeBlockManager(12000) - val memoryStore = store.memoryStore - val blockId = BlockId("rdd_3_10") - store.blockInfoManager.lockNewBlockForWriting( - blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) - memoryStore.putBytes(blockId, 13000, () => { - fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") - }) - } - - test("put a small ByteBuffer to MemoryStore") { - store = makeBlockManager(12000) - val memoryStore = store.memoryStore - val blockId = BlockId("rdd_3_10") - var bytes: ChunkedByteBuffer = null - memoryStore.putBytes(blockId, 10000, () => { - bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) - bytes - }) - assert(memoryStore.getSize(blockId) === 10000) } - test("read-locked blocks cannot be evicted from the MemoryStore") { + test("read-locked blocks cannot be evicted from memory") { store = makeBlockManager(12000) val arr = new Array[Byte](4000) // First store a1 and a2, both in memory, and a3, on disk only diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala new file mode 100644 index 0000000000..b4ab67ca15 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer + +import scala.language.implicitConversions +import scala.language.postfixOps +import scala.language.reflectiveCalls +import scala.reflect.ClassTag + +import org.scalatest._ + +import org.apache.spark._ +import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.serializer.{KryoSerializer, SerializerManager} +import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator} +import org.apache.spark.util._ +import org.apache.spark.util.io.ChunkedByteBuffer + +class MemoryStoreSuite + extends SparkFunSuite + with PrivateMethodTester + with BeforeAndAfterEach + with ResetSystemProperties { + + var conf: SparkConf = new SparkConf(false) + .set("spark.test.useCompressedOops", "true") + .set("spark.storage.unrollFraction", "0.4") + .set("spark.storage.unrollMemoryThreshold", "512") + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) + + // Implicitly convert strings to BlockIds for test clarity. + implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) + + override def beforeEach(): Unit = { + super.beforeEach() + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + System.setProperty("os.arch", "amd64") + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + } + + def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = { + val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) + val serializerManager = new SerializerManager(serializer, conf) + val blockInfoManager = new BlockInfoManager + val blockEvictionHandler = new BlockEvictionHandler { + var memoryStore: MemoryStore = _ + override private[storage] def dropFromMemory[T: ClassTag]( + blockId: BlockId, + data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + memoryStore.remove(blockId) + StorageLevel.NONE + } + } + val memoryStore = + new MemoryStore(conf, blockInfoManager, serializerManager, memManager, blockEvictionHandler) + memManager.setMemoryStore(memoryStore) + blockEvictionHandler.memoryStore = memoryStore + (memoryStore, blockInfoManager) + } + + test("reserve/release unroll memory") { + val (memoryStore, _) = makeMemoryStore(12000) + assert(memoryStore.currentUnrollMemory === 0) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { + memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory) + } + + // Reserve + assert(reserveUnrollMemoryForThisTask(100)) + assert(memoryStore.currentUnrollMemoryForThisTask === 100) + assert(reserveUnrollMemoryForThisTask(200)) + assert(memoryStore.currentUnrollMemoryForThisTask === 300) + assert(reserveUnrollMemoryForThisTask(500)) + assert(memoryStore.currentUnrollMemoryForThisTask === 800) + assert(!reserveUnrollMemoryForThisTask(1000000)) + assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted + // Release + memoryStore.releaseUnrollMemoryForThisTask(100) + assert(memoryStore.currentUnrollMemoryForThisTask === 700) + memoryStore.releaseUnrollMemoryForThisTask(100) + assert(memoryStore.currentUnrollMemoryForThisTask === 600) + // Reserve again + assert(reserveUnrollMemoryForThisTask(4400)) + assert(memoryStore.currentUnrollMemoryForThisTask === 5000) + assert(!reserveUnrollMemoryForThisTask(20000)) + assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted + // Release again + memoryStore.releaseUnrollMemoryForThisTask(1000) + assert(memoryStore.currentUnrollMemoryForThisTask === 4000) + memoryStore.releaseUnrollMemoryForThisTask() // release all + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + } + + test("safely unroll blocks") { + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + val ct = implicitly[ClassTag[Array[Byte]]] + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + def putIterator[T]( + blockId: BlockId, + iter: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + assert(blockInfoManager.lockNewBlockForWriting( + blockId, + new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false))) + val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag) + blockInfoManager.unlock(blockId) + res + } + + // Unroll with all the space in the world. This should succeed. + var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any) + assert(putResult.isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => + assert(e === a, "getValues() did not return original values!") + } + blockInfoManager.lockForWriting("unroll") + assert(memoryStore.remove("unroll")) + blockInfoManager.removeBlock("unroll") + + // Unroll with not enough space. This should succeed after kicking out someBlock1. + assert(putIterator("someBlock1", smallList.iterator, ct).isRight) + assert(putIterator("someBlock2", smallList.iterator, ct).isRight) + putResult = putIterator("unroll", smallList.iterator, ClassTag.Any) + assert(putResult.isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + assert(memoryStore.contains("someBlock2")) + assert(!memoryStore.contains("someBlock1")) + smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => + assert(e === a, "getValues() did not return original values!") + } + blockInfoManager.lockForWriting("unroll") + assert(memoryStore.remove("unroll")) + blockInfoManager.removeBlock("unroll") + + // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = + // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. + // In the meantime, however, we kicked out someBlock2 before giving up. + assert(putIterator("someBlock3", smallList.iterator, ct).isRight) + putResult = putIterator("unroll", bigList.iterator, ClassTag.Any) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + assert(!memoryStore.contains("someBlock2")) + assert(putResult.isLeft) + bigList.iterator.zip(putResult.left.get).foreach { case (e, a) => + assert(e === a, "putIterator() did not return original values!") + } + // The unroll memory was freed once the iterator returned by putIterator() was fully traversed. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + } + + test("safely unroll blocks through putIterator") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + def putIterator[T]( + blockId: BlockId, + iter: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + assert(blockInfoManager.lockNewBlockForWriting( + blockId, + new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false))) + val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag) + blockInfoManager.unlock(blockId) + res + } + + // Unroll with plenty of space. This should succeed and cache both blocks. + val result1 = putIterator("b1", smallIterator, ClassTag.Any) + val result2 = putIterator("b2", smallIterator, ClassTag.Any) + assert(memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(result1.isRight) // unroll was successful + assert(result2.isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + // Re-put these two blocks so block manager knows about them too. Otherwise, block manager + // would not know how to drop them from memory later. + blockInfoManager.lockForWriting("b1") + memoryStore.remove("b1") + blockInfoManager.removeBlock("b1") + blockInfoManager.lockForWriting("b2") + memoryStore.remove("b2") + blockInfoManager.removeBlock("b2") + putIterator("b1", smallIterator, ClassTag.Any) + putIterator("b2", smallIterator, ClassTag.Any) + + // Unroll with not enough space. This should succeed but kick out b1 in the process. + val result3 = putIterator("b3", smallIterator, ClassTag.Any) + assert(result3.isRight) + assert(!memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + blockInfoManager.lockForWriting("b3") + assert(memoryStore.remove("b3")) + blockInfoManager.removeBlock("b3") + putIterator("b3", smallIterator, ClassTag.Any) + + // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + val result4 = putIterator("b4", bigIterator, ClassTag.Any) + assert(result4.isLeft) // unroll was unsuccessful + assert(!memoryStore.contains("b1")) + assert(!memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b4")) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + result4.left.get.close() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory + } + + test("multiple unrolls by the same thread") { + val (memoryStore, _) = makeMemoryStore(12000) + val smallList = List.fill(40)(new Array[Byte](100)) + def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + def putIterator( + blockId: BlockId, + iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = { + memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any) + } + + // All unroll memory used is released because putIterator did not return an iterator + assert(putIterator("b1", smallIterator).isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + assert(putIterator("b2", smallIterator).isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + // Unroll memory is not released because putIterator returned an iterator + // that still depends on the underlying vector used in the process + assert(putIterator("b3", smallIterator).isLeft) + val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask + assert(unrollMemoryAfterB3 > 0) + + // The unroll memory owned by this thread builds on top of its value after the previous unrolls + assert(putIterator("b4", smallIterator).isLeft) + val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask + assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) + + // ... but only to a certain extent (until we run out of free space to grant new unroll memory) + assert(putIterator("b5", smallIterator).isLeft) + val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask + assert(putIterator("b6", smallIterator).isLeft) + val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask + assert(putIterator("b7", smallIterator).isLeft) + val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask + assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) + assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) + assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) + } + + test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val blockId = BlockId("rdd_3_10") + blockInfoManager.lockNewBlockForWriting( + blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) + memoryStore.putBytes(blockId, 13000, () => { + fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") + }) + } + + test("put a small ByteBuffer to MemoryStore") { + val (memoryStore, _) = makeMemoryStore(12000) + val blockId = BlockId("rdd_3_10") + var bytes: ChunkedByteBuffer = null + memoryStore.putBytes(blockId, 10000, () => { + bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) + bytes + }) + assert(memoryStore.getSize(blockId) === 10000) + } +} |