aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
commit3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch)
tree0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /core/src/test
parent6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff)
downloadspark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java32
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java17
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java12
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala197
-rw-r--r--core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala302
7 files changed, 333 insertions, 252 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 47c695ad4e..44733dcdaf 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -70,6 +70,7 @@ public class UnsafeShuffleWriterSuite {
final LinkedList<File> spillFilesCreated = new LinkedList<>();
SparkConf conf;
final Serializer serializer = new KryoSerializer(new SparkConf());
+ final SerializerManager serializerManager = new SerializerManager(serializer, new SparkConf());
TaskMetrics taskMetrics;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@@ -111,7 +112,7 @@ public class UnsafeShuffleWriterSuite {
.set("spark.memory.offHeap.enabled", "false");
taskMetrics = new TaskMetrics();
memoryManager = new TestMemoryManager(conf);
- taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
+ taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
when(blockManager.getDiskWriter(
@@ -135,35 +136,6 @@ public class UnsafeShuffleWriterSuite {
);
}
});
- when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class))).thenAnswer(
- new Answer<InputStream>() {
- @Override
- public InputStream answer(InvocationOnMock invocation) throws Throwable {
- assertTrue(invocation.getArguments()[0] instanceof TempShuffleBlockId);
- InputStream is = (InputStream) invocation.getArguments()[1];
- if (conf.getBoolean("spark.shuffle.compress", true)) {
- return CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(is);
- } else {
- return is;
- }
- }
- }
- );
-
- when(blockManager.wrapForCompression(any(BlockId.class), any(OutputStream.class))).thenAnswer(
- new Answer<OutputStream>() {
- @Override
- public OutputStream answer(InvocationOnMock invocation) throws Throwable {
- assertTrue(invocation.getArguments()[0] instanceof TempShuffleBlockId);
- OutputStream os = (OutputStream) invocation.getArguments()[1];
- if (conf.getBoolean("spark.shuffle.compress", true)) {
- return CompressionCodec$.MODULE$.createCodec(conf).compressedOutputStream(os);
- } else {
- return os;
- }
- }
- }
- );
when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
doAnswer(new Answer<Void>() {
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 6667179b9d..449fb45c30 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.unsafe.map;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.*;
@@ -42,7 +41,9 @@ import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
@@ -51,7 +52,6 @@ import org.apache.spark.util.Utils;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.mockito.AdditionalAnswers.returnsSecondArg;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@@ -64,6 +64,9 @@ public abstract class AbstractBytesToBytesMapSuite {
private TestMemoryManager memoryManager;
private TaskMemoryManager taskMemoryManager;
+ private SerializerManager serializerManager = new SerializerManager(
+ new JavaSerializer(new SparkConf()),
+ new SparkConf().set("spark.shuffle.spill.compress", "false"));
private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
final LinkedList<File> spillFilesCreated = new LinkedList<>();
@@ -85,7 +88,9 @@ public abstract class AbstractBytesToBytesMapSuite {
new TestMemoryManager(
new SparkConf()
.set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator())
- .set("spark.memory.offHeap.size", "256mb"));
+ .set("spark.memory.offHeap.size", "256mb")
+ .set("spark.shuffle.spill.compress", "false")
+ .set("spark.shuffle.compress", "false"));
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
@@ -124,8 +129,6 @@ public abstract class AbstractBytesToBytesMapSuite {
);
}
});
- when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class)))
- .then(returnsSecondArg());
}
@After
@@ -546,8 +549,8 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void spillInIterator() throws IOException {
- BytesToBytesMap map =
- new BytesToBytesMap(taskMemoryManager, blockManager, 1, 0.75, 1024, false);
+ BytesToBytesMap map = new BytesToBytesMap(
+ taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
try {
int i;
for (i = 0; i < 1024; i++) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index db50e551f2..a2253d8559 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.LinkedList;
@@ -43,14 +42,15 @@ import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
-import static org.mockito.AdditionalAnswers.returnsSecondArg;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
import static org.mockito.Mockito.*;
@@ -60,6 +60,9 @@ public class UnsafeExternalSorterSuite {
final TestMemoryManager memoryManager =
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
+ final SerializerManager serializerManager = new SerializerManager(
+ new JavaSerializer(new SparkConf()),
+ new SparkConf().set("spark.shuffle.spill.compress", "false"));
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
@Override
@@ -135,8 +138,6 @@ public class UnsafeExternalSorterSuite {
);
}
});
- when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class)))
- .then(returnsSecondArg());
}
@After
@@ -172,6 +173,7 @@ public class UnsafeExternalSorterSuite {
return UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
+ serializerManager,
taskContext,
recordComparator,
prefixComparator,
@@ -374,6 +376,7 @@ public class UnsafeExternalSorterSuite {
final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
+ serializerManager,
taskContext,
null,
null,
@@ -408,6 +411,7 @@ public class UnsafeExternalSorterSuite {
final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
+ serializerManager,
taskContext,
recordComparator,
prefixComparator,
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 2732cd6749..3dded4d486 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -194,10 +194,11 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val blockId = blockIds(0)
val blockManager = SparkEnv.get.blockManager
val blockTransfer = SparkEnv.get.blockTransferService
+ val serializerManager = SparkEnv.get.serializerManager
blockManager.master.getLocations(blockId).foreach { cmId =>
val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
blockId.toString)
- val deserialized = blockManager.dataDeserialize[Int](blockId,
+ val deserialized = serializerManager.dataDeserialize[Int](blockId,
new ChunkedByteBuffer(bytes.nioByteBuffer())).toList
assert(deserialized === (1 to 100).toList)
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
index 08f52c92e1..dba1172d5f 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
@@ -20,14 +20,11 @@ package org.apache.spark.shuffle
import java.io.{ByteArrayOutputStream, InputStream}
import java.nio.ByteBuffer
-import org.mockito.Matchers.{eq => meq, _}
import org.mockito.Mockito.{mock, when}
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.apache.spark._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId}
/**
@@ -77,13 +74,6 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
// can ensure retain() and release() are properly called.
val blockManager = mock(classOf[BlockManager])
- // Create a return function to use for the mocked wrapForCompression method that just returns
- // the original input stream.
- val dummyCompressionFunction = new Answer[InputStream] {
- override def answer(invocation: InvocationOnMock): InputStream =
- invocation.getArguments()(1).asInstanceOf[InputStream]
- }
-
// Create a buffer with some randomly generated key-value pairs to use as the shuffle data
// from each mappers (all mappers return the same shuffle data).
val byteOutputStream = new ByteArrayOutputStream()
@@ -105,9 +95,6 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
// fetch shuffle data.
val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId)
when(blockManager.getBlockData(shuffleBlockId)).thenReturn(managedBuffer)
- when(blockManager.wrapForCompression(meq(shuffleBlockId), isA(classOf[InputStream])))
- .thenAnswer(dummyCompressionFunction)
-
managedBuffer
}
@@ -133,11 +120,18 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
+ val serializerManager = new SerializerManager(
+ serializer,
+ new SparkConf()
+ .set("spark.shuffle.compress", "false")
+ .set("spark.shuffle.spill.compress", "false"))
+
val shuffleReader = new BlockStoreShuffleReader(
shuffleHandle,
reduceId,
reduceId + 1,
TaskContext.empty(),
+ serializerManager,
blockManager,
mapOutputTracker)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9419dfaa00..94f6f87740 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1033,138 +1033,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
}
- test("reserve/release unroll memory") {
- store = makeBlockManager(12000)
- val memoryStore = store.memoryStore
- assert(memoryStore.currentUnrollMemory === 0)
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-
- def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
- memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
- }
-
- // Reserve
- assert(reserveUnrollMemoryForThisTask(100))
- assert(memoryStore.currentUnrollMemoryForThisTask === 100)
- assert(reserveUnrollMemoryForThisTask(200))
- assert(memoryStore.currentUnrollMemoryForThisTask === 300)
- assert(reserveUnrollMemoryForThisTask(500))
- assert(memoryStore.currentUnrollMemoryForThisTask === 800)
- assert(!reserveUnrollMemoryForThisTask(1000000))
- assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted
- // Release
- memoryStore.releaseUnrollMemoryForThisTask(100)
- assert(memoryStore.currentUnrollMemoryForThisTask === 700)
- memoryStore.releaseUnrollMemoryForThisTask(100)
- assert(memoryStore.currentUnrollMemoryForThisTask === 600)
- // Reserve again
- assert(reserveUnrollMemoryForThisTask(4400))
- assert(memoryStore.currentUnrollMemoryForThisTask === 5000)
- assert(!reserveUnrollMemoryForThisTask(20000))
- assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted
- // Release again
- memoryStore.releaseUnrollMemoryForThisTask(1000)
- assert(memoryStore.currentUnrollMemoryForThisTask === 4000)
- memoryStore.releaseUnrollMemoryForThisTask() // release all
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- }
-
- test("safely unroll blocks") {
- store = makeBlockManager(12000)
- val smallList = List.fill(40)(new Array[Byte](100))
- val bigList = List.fill(40)(new Array[Byte](1000))
- val memoryStore = store.memoryStore
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-
- // Unroll with all the space in the world. This should succeed.
- var putResult =
- memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
- assert(putResult.isRight)
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
- assert(e === a, "getValues() did not return original values!")
- }
- assert(memoryStore.remove("unroll"))
-
- // Unroll with not enough space. This should succeed after kicking out someBlock1.
- assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY))
- assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY))
- putResult =
- memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
- assert(putResult.isRight)
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(memoryStore.contains("someBlock2"))
- assert(!memoryStore.contains("someBlock1"))
- smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
- assert(e === a, "getValues() did not return original values!")
- }
- assert(memoryStore.remove("unroll"))
-
- // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
- // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
- // In the mean time, however, we kicked out someBlock2 before giving up.
- assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY))
- putResult =
- memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
- assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
- assert(!memoryStore.contains("someBlock2"))
- assert(putResult.isLeft)
- bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
- assert(e === a, "putIterator() did not return original values!")
- }
- // The unroll memory was freed once the iterator returned by putIterator() was fully traversed.
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- }
-
- test("safely unroll blocks through putIterator") {
- store = makeBlockManager(12000)
- val memOnly = StorageLevel.MEMORY_ONLY
- val memoryStore = store.memoryStore
- val smallList = List.fill(40)(new Array[Byte](100))
- val bigList = List.fill(40)(new Array[Byte](1000))
- def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
- def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-
- // Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any)
- val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any)
- assert(memoryStore.contains("b1"))
- assert(memoryStore.contains("b2"))
- assert(result1.isRight) // unroll was successful
- assert(result2.isRight)
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-
- // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
- // would not know how to drop them from memory later.
- memoryStore.remove("b1")
- memoryStore.remove("b2")
- store.putIterator("b1", smallIterator, memOnly)
- store.putIterator("b2", smallIterator, memOnly)
-
- // Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any)
- assert(result3.isRight)
- assert(!memoryStore.contains("b1"))
- assert(memoryStore.contains("b2"))
- assert(memoryStore.contains("b3"))
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- memoryStore.remove("b3")
- store.putIterator("b3", smallIterator, memOnly)
-
- // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, ClassTag.Any)
- assert(result4.isLeft) // unroll was unsuccessful
- assert(!memoryStore.contains("b1"))
- assert(!memoryStore.contains("b2"))
- assert(memoryStore.contains("b3"))
- assert(!memoryStore.contains("b4"))
- assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
- }
-
- /**
- * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK.
- */
test("safely unroll blocks through putIterator (disk)") {
store = makeBlockManager(12000)
val memAndDisk = StorageLevel.MEMORY_AND_DISK
@@ -1203,72 +1071,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
- assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
- result4.left.get.close()
- assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
- }
-
- test("multiple unrolls by the same thread") {
- store = makeBlockManager(12000)
- val memOnly = StorageLevel.MEMORY_ONLY
- val memoryStore = store.memoryStore
- val smallList = List.fill(40)(new Array[Byte](100))
- def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-
- // All unroll memory used is released because putIterator did not return an iterator
- assert(memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any).isRight)
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any).isRight)
- assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-
- // Unroll memory is not released because putIterator returned an iterator
- // that still depends on the underlying vector used in the process
- assert(memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any).isLeft)
- val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
- assert(unrollMemoryAfterB3 > 0)
-
- // The unroll memory owned by this thread builds on top of its value after the previous unrolls
- assert(memoryStore.putIterator("b4", smallIterator, memOnly, ClassTag.Any).isLeft)
- val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
- assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
-
- // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- assert(memoryStore.putIterator("b5", smallIterator, memOnly, ClassTag.Any).isLeft)
- val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- assert(memoryStore.putIterator("b6", smallIterator, memOnly, ClassTag.Any).isLeft)
- val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- assert(memoryStore.putIterator("b7", smallIterator, memOnly, ClassTag.Any).isLeft)
- val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
- assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
- assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
- assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
- }
-
- test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
- store = makeBlockManager(12000)
- val memoryStore = store.memoryStore
- val blockId = BlockId("rdd_3_10")
- store.blockInfoManager.lockNewBlockForWriting(
- blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false))
- memoryStore.putBytes(blockId, 13000, () => {
- fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
- })
- }
-
- test("put a small ByteBuffer to MemoryStore") {
- store = makeBlockManager(12000)
- val memoryStore = store.memoryStore
- val blockId = BlockId("rdd_3_10")
- var bytes: ChunkedByteBuffer = null
- memoryStore.putBytes(blockId, 10000, () => {
- bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
- bytes
- })
- assert(memoryStore.getSize(blockId) === 10000)
}
- test("read-locked blocks cannot be evicted from the MemoryStore") {
+ test("read-locked blocks cannot be evicted from memory") {
store = makeBlockManager(12000)
val arr = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
new file mode 100644
index 0000000000..b4ab67ca15
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.language.reflectiveCalls
+import scala.reflect.ClassTag
+
+import org.scalatest._
+
+import org.apache.spark._
+import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
+import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator}
+import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class MemoryStoreSuite
+ extends SparkFunSuite
+ with PrivateMethodTester
+ with BeforeAndAfterEach
+ with ResetSystemProperties {
+
+ var conf: SparkConf = new SparkConf(false)
+ .set("spark.test.useCompressedOops", "true")
+ .set("spark.storage.unrollFraction", "0.4")
+ .set("spark.storage.unrollMemoryThreshold", "512")
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+
+ // Implicitly convert strings to BlockIds for test clarity.
+ implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+ def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ System.setProperty("os.arch", "amd64")
+ val initialize = PrivateMethod[Unit]('initialize)
+ SizeEstimator invokePrivate initialize()
+ }
+
+ def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockInfoManager = new BlockInfoManager
+ val blockEvictionHandler = new BlockEvictionHandler {
+ var memoryStore: MemoryStore = _
+ override private[storage] def dropFromMemory[T: ClassTag](
+ blockId: BlockId,
+ data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
+ memoryStore.remove(blockId)
+ StorageLevel.NONE
+ }
+ }
+ val memoryStore =
+ new MemoryStore(conf, blockInfoManager, serializerManager, memManager, blockEvictionHandler)
+ memManager.setMemoryStore(memoryStore)
+ blockEvictionHandler.memoryStore = memoryStore
+ (memoryStore, blockInfoManager)
+ }
+
+ test("reserve/release unroll memory") {
+ val (memoryStore, _) = makeMemoryStore(12000)
+ assert(memoryStore.currentUnrollMemory === 0)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
+ memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
+ }
+
+ // Reserve
+ assert(reserveUnrollMemoryForThisTask(100))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 100)
+ assert(reserveUnrollMemoryForThisTask(200))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 300)
+ assert(reserveUnrollMemoryForThisTask(500))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 800)
+ assert(!reserveUnrollMemoryForThisTask(1000000))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted
+ // Release
+ memoryStore.releaseUnrollMemoryForThisTask(100)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 700)
+ memoryStore.releaseUnrollMemoryForThisTask(100)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 600)
+ // Reserve again
+ assert(reserveUnrollMemoryForThisTask(4400))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 5000)
+ assert(!reserveUnrollMemoryForThisTask(20000))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted
+ // Release again
+ memoryStore.releaseUnrollMemoryForThisTask(1000)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 4000)
+ memoryStore.releaseUnrollMemoryForThisTask() // release all
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("safely unroll blocks") {
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ val ct = implicitly[ClassTag[Array[Byte]]]
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIterator[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+ assert(blockInfoManager.lockNewBlockForWriting(
+ blockId,
+ new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
+ val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ blockInfoManager.unlock(blockId)
+ res
+ }
+
+ // Unroll with all the space in the world. This should succeed.
+ var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putResult.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
+ assert(e === a, "getValues() did not return original values!")
+ }
+ blockInfoManager.lockForWriting("unroll")
+ assert(memoryStore.remove("unroll"))
+ blockInfoManager.removeBlock("unroll")
+
+ // Unroll with not enough space. This should succeed after kicking out someBlock1.
+ assert(putIterator("someBlock1", smallList.iterator, ct).isRight)
+ assert(putIterator("someBlock2", smallList.iterator, ct).isRight)
+ putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putResult.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ assert(memoryStore.contains("someBlock2"))
+ assert(!memoryStore.contains("someBlock1"))
+ smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
+ assert(e === a, "getValues() did not return original values!")
+ }
+ blockInfoManager.lockForWriting("unroll")
+ assert(memoryStore.remove("unroll"))
+ blockInfoManager.removeBlock("unroll")
+
+ // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
+ // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
+ // In the meantime, however, we kicked out someBlock2 before giving up.
+ assert(putIterator("someBlock3", smallList.iterator, ct).isRight)
+ putResult = putIterator("unroll", bigList.iterator, ClassTag.Any)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+ assert(!memoryStore.contains("someBlock2"))
+ assert(putResult.isLeft)
+ bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
+ assert(e === a, "putIterator() did not return original values!")
+ }
+ // The unroll memory was freed once the iterator returned by putIterator() was fully traversed.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("safely unroll blocks through putIterator") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIterator[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+ assert(blockInfoManager.lockNewBlockForWriting(
+ blockId,
+ new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
+ val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ blockInfoManager.unlock(blockId)
+ res
+ }
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ val result1 = putIterator("b1", smallIterator, ClassTag.Any)
+ val result2 = putIterator("b2", smallIterator, ClassTag.Any)
+ assert(memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+ // would not know how to drop them from memory later.
+ blockInfoManager.lockForWriting("b1")
+ memoryStore.remove("b1")
+ blockInfoManager.removeBlock("b1")
+ blockInfoManager.lockForWriting("b2")
+ memoryStore.remove("b2")
+ blockInfoManager.removeBlock("b2")
+ putIterator("b1", smallIterator, ClassTag.Any)
+ putIterator("b2", smallIterator, ClassTag.Any)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ val result3 = putIterator("b3", smallIterator, ClassTag.Any)
+ assert(result3.isRight)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ blockInfoManager.lockForWriting("b3")
+ assert(memoryStore.remove("b3"))
+ blockInfoManager.removeBlock("b3")
+ putIterator("b3", smallIterator, ClassTag.Any)
+
+ // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ val result4 = putIterator("b4", bigIterator, ClassTag.Any)
+ assert(result4.isLeft) // unroll was unsuccessful
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+ result4.left.get.close()
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
+ }
+
+ test("multiple unrolls by the same thread") {
+ val (memoryStore, _) = makeMemoryStore(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIterator(
+ blockId: BlockId,
+ iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = {
+ memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any)
+ }
+
+ // All unroll memory used is released because putIterator did not return an iterator
+ assert(putIterator("b1", smallIterator).isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ assert(putIterator("b2", smallIterator).isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ // Unroll memory is not released because putIterator returned an iterator
+ // that still depends on the underlying vector used in the process
+ assert(putIterator("b3", smallIterator).isLeft)
+ val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
+ assert(unrollMemoryAfterB3 > 0)
+
+ // The unroll memory owned by this thread builds on top of its value after the previous unrolls
+ assert(putIterator("b4", smallIterator).isLeft)
+ val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
+ assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
+
+ // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
+ assert(putIterator("b5", smallIterator).isLeft)
+ val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
+ assert(putIterator("b6", smallIterator).isLeft)
+ val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
+ assert(putIterator("b7", smallIterator).isLeft)
+ val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
+ assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
+ assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
+ assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
+ }
+
+ test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val blockId = BlockId("rdd_3_10")
+ blockInfoManager.lockNewBlockForWriting(
+ blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false))
+ memoryStore.putBytes(blockId, 13000, () => {
+ fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
+ })
+ }
+
+ test("put a small ByteBuffer to MemoryStore") {
+ val (memoryStore, _) = makeMemoryStore(12000)
+ val blockId = BlockId("rdd_3_10")
+ var bytes: ChunkedByteBuffer = null
+ memoryStore.putBytes(blockId, 10000, () => {
+ bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
+ bytes
+ })
+ assert(memoryStore.getSize(blockId) === 10000)
+ }
+}