aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
commit3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch)
tree0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /sql/core/src/main
parent6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff)
downloadspark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java8
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala1
5 files changed, 15 insertions, 3 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index acf6c583bb..8882903bbf 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -241,7 +241,11 @@ public final class UnsafeFixedWidthAggregationMap {
*/
public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException {
return new UnsafeKVExternalSorter(
- groupingKeySchema, aggregationBufferSchema,
- SparkEnv.get().blockManager(), map.getPageSizeBytes(), map);
+ groupingKeySchema,
+ aggregationBufferSchema,
+ SparkEnv.get().blockManager(),
+ SparkEnv.get().serializerManager(),
+ map.getPageSizeBytes(),
+ map);
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 9e08675c3e..d3bfb00b3f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.TaskContext;
import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering;
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
@@ -52,14 +53,16 @@ public final class UnsafeKVExternalSorter {
StructType keySchema,
StructType valueSchema,
BlockManager blockManager,
+ SerializerManager serializerManager,
long pageSizeBytes) throws IOException {
- this(keySchema, valueSchema, blockManager, pageSizeBytes, null);
+ this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null);
}
public UnsafeKVExternalSorter(
StructType keySchema,
StructType valueSchema,
BlockManager blockManager,
+ SerializerManager serializerManager,
long pageSizeBytes,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
@@ -77,6 +80,7 @@ public final class UnsafeKVExternalSorter {
sorter = UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
+ serializerManager,
taskContext,
recordComparator,
prefixComparator,
@@ -116,6 +120,7 @@ public final class UnsafeKVExternalSorter {
sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
taskMemoryManager,
blockManager,
+ serializerManager,
taskContext,
new KVComparator(ordering, keySchema.length()),
prefixComparator,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index a4c0e1c9fb..270c09aff3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -339,6 +339,7 @@ case class Window(
sorter = UnsafeExternalSorter.create(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
+ SparkEnv.get.serializerManager,
TaskContext.get(),
null,
null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index c74ac8a282..233ac263aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -399,6 +399,7 @@ private[sql] class DynamicPartitionWriterContainer(
sortingKeySchema,
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
+ SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
while (iterator.hasNext) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index fabd2fbe1e..fb65b50da8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -41,6 +41,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
val sorter = UnsafeExternalSorter.create(
context.taskMemoryManager(),
SparkEnv.get.blockManager,
+ SparkEnv.get.serializerManager,
context,
null,
null,