aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
commit3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch)
tree0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /core/src/main/java
parent6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff)
downloadspark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java7
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java17
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java6
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java5
4 files changed, 24 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index de36814ecc..9aacb084f6 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -32,6 +32,7 @@ import org.apache.spark.SparkEnv;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
@@ -163,12 +164,14 @@ public final class BytesToBytesMap extends MemoryConsumer {
private long peakMemoryUsedBytes = 0L;
private final BlockManager blockManager;
+ private final SerializerManager serializerManager;
private volatile MapIterator destructiveIterator = null;
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
+ SerializerManager serializerManager,
int initialCapacity,
double loadFactor,
long pageSizeBytes,
@@ -176,6 +179,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
super(taskMemoryManager, pageSizeBytes);
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
+ this.serializerManager = serializerManager;
this.loadFactor = loadFactor;
this.loc = new Location();
this.pageSizeBytes = pageSizeBytes;
@@ -209,6 +213,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
this(
taskMemoryManager,
SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
+ SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null,
initialCapacity,
0.70,
pageSizeBytes,
@@ -271,7 +276,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
try {
Closeables.close(reader, /* swallowIOException = */ false);
- reader = spillWriters.getFirst().getReader(blockManager);
+ reader = spillWriters.getFirst().getReader(serializerManager);
recordsInPage = -1;
} catch (IOException e) {
// Scala iterator does not handle exception
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 927b19c4e8..ded8f0472b 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -31,6 +31,7 @@ import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
@@ -51,6 +52,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final RecordComparator recordComparator;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
+ private final SerializerManager serializerManager;
private final TaskContext taskContext;
private ShuffleWriteMetrics writeMetrics;
@@ -78,6 +80,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
public static UnsafeExternalSorter createWithExistingInMemorySorter(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
+ SerializerManager serializerManager,
TaskContext taskContext,
RecordComparator recordComparator,
PrefixComparator prefixComparator,
@@ -85,7 +88,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
long pageSizeBytes,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
- taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
+ serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
+ pageSizeBytes, inMemorySorter);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
@@ -95,18 +99,20 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
public static UnsafeExternalSorter create(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
+ SerializerManager serializerManager,
TaskContext taskContext,
RecordComparator recordComparator,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes) {
- return new UnsafeExternalSorter(taskMemoryManager, blockManager,
+ return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null);
}
private UnsafeExternalSorter(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
+ SerializerManager serializerManager,
TaskContext taskContext,
RecordComparator recordComparator,
PrefixComparator prefixComparator,
@@ -116,6 +122,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
super(taskMemoryManager, pageSizeBytes);
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
+ this.serializerManager = serializerManager;
this.taskContext = taskContext;
this.recordComparator = recordComparator;
this.prefixComparator = prefixComparator;
@@ -412,7 +419,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size());
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
- spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
+ spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager));
}
if (inMemSorter != null) {
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
@@ -463,7 +470,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
spillWriter.close();
spillWriters.add(spillWriter);
- nextUpstream = spillWriter.getReader(blockManager);
+ nextUpstream = spillWriter.getReader(serializerManager);
long released = 0L;
synchronized (UnsafeExternalSorter.this) {
@@ -549,7 +556,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
- queue.add(spillWriter.getReader(blockManager));
+ queue.add(spillWriter.getReader(serializerManager));
}
if (inMemSorter != null) {
queue.add(inMemSorter.getSortedIterator());
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 20ee1c8eb0..1d588c37c5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -22,8 +22,8 @@ import java.io.*;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
/**
@@ -46,13 +46,13 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
private final long baseOffset = Platform.BYTE_ARRAY_OFFSET;
public UnsafeSorterSpillReader(
- BlockManager blockManager,
+ SerializerManager serializerManager,
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
try {
- this.in = blockManager.wrapForCompression(blockId, bs);
+ this.in = serializerManager.wrapForCompression(blockId, bs);
this.din = new DataInputStream(this.in);
numRecords = numRecordsRemaining = din.readInt();
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index 234e21140a..9ba760e842 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.File;
import java.io.IOException;
+import org.apache.spark.serializer.SerializerManager;
import scala.Tuple2;
import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -144,7 +145,7 @@ public final class UnsafeSorterSpillWriter {
return file;
}
- public UnsafeSorterSpillReader getReader(BlockManager blockManager) throws IOException {
- return new UnsafeSorterSpillReader(blockManager, file, blockId);
+ public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException {
+ return new UnsafeSorterSpillReader(serializerManager, file, blockId);
}
}