diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-23 10:15:23 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-03-23 10:15:23 -0700 |
commit | 3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch) | |
tree | 0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /sql/core/src/main | |
parent | 6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff) | |
download | spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2 spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip |
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`.
- The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`.
- `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`.
- The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests.
- Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'sql/core/src/main')
5 files changed, 15 insertions, 3 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index acf6c583bb..8882903bbf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -241,7 +241,11 @@ public final class UnsafeFixedWidthAggregationMap { */ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException { return new UnsafeKVExternalSorter( - groupingKeySchema, aggregationBufferSchema, - SparkEnv.get().blockManager(), map.getPageSizeBytes(), map); + groupingKeySchema, + aggregationBufferSchema, + SparkEnv.get().blockManager(), + SparkEnv.get().serializerManager(), + map.getPageSizeBytes(), + map); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 9e08675c3e..d3bfb00b3f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.TaskContext; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.serializer.SerializerManager; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering; import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering; @@ -52,14 +53,16 @@ public final class UnsafeKVExternalSorter { StructType keySchema, StructType valueSchema, BlockManager blockManager, + SerializerManager serializerManager, long pageSizeBytes) throws IOException { - this(keySchema, valueSchema, blockManager, pageSizeBytes, null); + this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null); } public UnsafeKVExternalSorter( StructType keySchema, StructType valueSchema, BlockManager blockManager, + SerializerManager serializerManager, long pageSizeBytes, @Nullable BytesToBytesMap map) throws IOException { this.keySchema = keySchema; @@ -77,6 +80,7 @@ public final class UnsafeKVExternalSorter { sorter = UnsafeExternalSorter.create( taskMemoryManager, blockManager, + serializerManager, taskContext, recordComparator, prefixComparator, @@ -116,6 +120,7 @@ public final class UnsafeKVExternalSorter { sorter = UnsafeExternalSorter.createWithExistingInMemorySorter( taskMemoryManager, blockManager, + serializerManager, taskContext, new KVComparator(ordering, keySchema.length()), prefixComparator, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index a4c0e1c9fb..270c09aff3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -339,6 +339,7 @@ case class Window( sorter = UnsafeExternalSorter.create( TaskContext.get().taskMemoryManager(), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, TaskContext.get(), null, null, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index c74ac8a282..233ac263aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -399,6 +399,7 @@ private[sql] class DynamicPartitionWriterContainer( sortingKeySchema, StructType.fromAttributes(dataColumns), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, TaskContext.get().taskMemoryManager().pageSizeBytes) while (iterator.hasNext) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index fabd2fbe1e..fb65b50da8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -41,6 +41,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField val sorter = UnsafeExternalSorter.create( context.taskMemoryManager(), SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, context, null, null, |