diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-23 10:15:23 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-03-23 10:15:23 -0700 |
commit | 3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch) | |
tree | 0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /core/src/main/java | |
parent | 6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff) | |
download | spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2 spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip |
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`.
- The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`.
- `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`.
- The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests.
- Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'core/src/main/java')
4 files changed, 24 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index de36814ecc..9aacb084f6 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -32,6 +32,7 @@ import org.apache.spark.SparkEnv; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; @@ -163,12 +164,14 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; private final BlockManager blockManager; + private final SerializerManager serializerManager; private volatile MapIterator destructiveIterator = null; private LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>(); public BytesToBytesMap( TaskMemoryManager taskMemoryManager, BlockManager blockManager, + SerializerManager serializerManager, int initialCapacity, double loadFactor, long pageSizeBytes, @@ -176,6 +179,7 @@ public final class BytesToBytesMap extends MemoryConsumer { super(taskMemoryManager, pageSizeBytes); this.taskMemoryManager = taskMemoryManager; this.blockManager = blockManager; + this.serializerManager = serializerManager; this.loadFactor = loadFactor; this.loc = new Location(); this.pageSizeBytes = pageSizeBytes; @@ -209,6 +213,7 @@ public final class BytesToBytesMap extends MemoryConsumer { this( taskMemoryManager, SparkEnv.get() != null ? SparkEnv.get().blockManager() : null, + SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null, initialCapacity, 0.70, pageSizeBytes, @@ -271,7 +276,7 @@ public final class BytesToBytesMap extends MemoryConsumer { } try { Closeables.close(reader, /* swallowIOException = */ false); - reader = spillWriters.getFirst().getReader(blockManager); + reader = spillWriters.getFirst().getReader(serializerManager); recordsInPage = -1; } catch (IOException e) { // Scala iterator does not handle exception diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 927b19c4e8..ded8f0472b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -31,6 +31,7 @@ import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; @@ -51,6 +52,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final RecordComparator recordComparator; private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; + private final SerializerManager serializerManager; private final TaskContext taskContext; private ShuffleWriteMetrics writeMetrics; @@ -78,6 +80,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { public static UnsafeExternalSorter createWithExistingInMemorySorter( TaskMemoryManager taskMemoryManager, BlockManager blockManager, + SerializerManager serializerManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, @@ -85,7 +88,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer { long pageSizeBytes, UnsafeInMemorySorter inMemorySorter) throws IOException { UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, - taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter); + serializerManager, taskContext, recordComparator, prefixComparator, initialSize, + pageSizeBytes, inMemorySorter); sorter.spill(Long.MAX_VALUE, sorter); // The external sorter will be used to insert records, in-memory sorter is not needed. sorter.inMemSorter = null; @@ -95,18 +99,20 @@ public final class UnsafeExternalSorter extends MemoryConsumer { public static UnsafeExternalSorter create( TaskMemoryManager taskMemoryManager, BlockManager blockManager, + SerializerManager serializerManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes) { - return new UnsafeExternalSorter(taskMemoryManager, blockManager, + return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager, taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null); } private UnsafeExternalSorter( TaskMemoryManager taskMemoryManager, BlockManager blockManager, + SerializerManager serializerManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, @@ -116,6 +122,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { super(taskMemoryManager, pageSizeBytes); this.taskMemoryManager = taskMemoryManager; this.blockManager = blockManager; + this.serializerManager = serializerManager; this.taskContext = taskContext; this.recordComparator = recordComparator; this.prefixComparator = prefixComparator; @@ -412,7 +419,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size()); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { - spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager)); + spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager)); } if (inMemSorter != null) { readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); @@ -463,7 +470,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } spillWriter.close(); spillWriters.add(spillWriter); - nextUpstream = spillWriter.getReader(blockManager); + nextUpstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized (UnsafeExternalSorter.this) { @@ -549,7 +556,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } else { LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { - queue.add(spillWriter.getReader(blockManager)); + queue.add(spillWriter.getReader(serializerManager)); } if (inMemSorter != null) { queue.add(inMemSorter.getSortedIterator()); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 20ee1c8eb0..1d588c37c5 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -22,8 +22,8 @@ import java.io.*; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; -import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; /** @@ -46,13 +46,13 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen private final long baseOffset = Platform.BYTE_ARRAY_OFFSET; public UnsafeSorterSpillReader( - BlockManager blockManager, + SerializerManager serializerManager, File file, BlockId blockId) throws IOException { assert (file.length() > 0); final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file)); try { - this.in = blockManager.wrapForCompression(blockId, bs); + this.in = serializerManager.wrapForCompression(blockId, bs); this.din = new DataInputStream(this.in); numRecords = numRecordsRemaining = din.readInt(); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java index 234e21140a..9ba760e842 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java @@ -20,6 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort; import java.io.File; import java.io.IOException; +import org.apache.spark.serializer.SerializerManager; import scala.Tuple2; import org.apache.spark.executor.ShuffleWriteMetrics; @@ -144,7 +145,7 @@ public final class UnsafeSorterSpillWriter { return file; } - public UnsafeSorterSpillReader getReader(BlockManager blockManager) throws IOException { - return new UnsafeSorterSpillReader(blockManager, file, blockId); + public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException { + return new UnsafeSorterSpillReader(serializerManager, file, blockId); } } |