aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-10-25 21:19:52 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-25 21:19:52 -0700
commit85e654c5ec87e666a8845bfd77185c1ea57b268a (patch)
tree2beadbc8fbb54369325970a4e2c7189506efad89 /sql/core
parent63accc79625d8a03d0624717af5e1d81b18a6da3 (diff)
downloadspark-85e654c5ec87e666a8845bfd77185c1ea57b268a.tar.gz
spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.tar.bz2
spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.zip
[SPARK-10984] Simplify *MemoryManager class structure
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes: - MemoryManager - StaticMemoryManager - ExecutorMemoryManager - TaskMemoryManager - ShuffleMemoryManager This is fairly confusing. To simplify things, this patch consolidates several of these classes: - ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager. - TaskMemoryManager is moved into Spark Core. **Key changes and tasks**: - [x] Merge ExecutorMemoryManager into MemoryManager. - [x] Move pooling logic into Allocator. - [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`. - [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager. - [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager. - [x] Merge ShuffleMemoryManager into MemoryManager. - [x] Move code - [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.** - [x] Port ShuffleMemoryManagerSuite tests. - [x] Move classes from `unsafe` package to `memory` package. - [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction. - [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation - [x] AbstractBytesToBytesMapSuite - [x] UnsafeExternalSorterSuite - [x] UnsafeFixedWidthAggregationMapSuite - [x] UnsafeKVExternalSorterSuite **Compatiblity notes**: - This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task. Author: Josh Rosen <joshrosen@databricks.com> Closes #9127 from JoshRosen/SPARK-10984.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java12
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala75
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala54
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala4
11 files changed, 70 insertions, 164 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 09511ff35f..82c645df28 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.SparkEnv;
-import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -32,7 +31,7 @@ import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryLocation;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import org.apache.spark.memory.TaskMemoryManager;
/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
@@ -88,8 +87,6 @@ public final class UnsafeFixedWidthAggregationMap {
* @param aggregationBufferSchema the schema of the aggregation buffer, used for row conversion.
* @param groupingKeySchema the schema of the grouping key, used for row conversion.
* @param taskMemoryManager the memory manager used to allocate our Unsafe memory structures.
- * @param shuffleMemoryManager the shuffle memory manager, for coordinating our memory usage with
- * other tasks.
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
* @param pageSizeBytes the data page size, in bytes; limits the maximum record size.
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
@@ -99,15 +96,14 @@ public final class UnsafeFixedWidthAggregationMap {
StructType aggregationBufferSchema,
StructType groupingKeySchema,
TaskMemoryManager taskMemoryManager,
- ShuffleMemoryManager shuffleMemoryManager,
int initialCapacity,
long pageSizeBytes,
boolean enablePerfMetrics) {
this.aggregationBufferSchema = aggregationBufferSchema;
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
- this.map = new BytesToBytesMap(
- taskMemoryManager, shuffleMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics);
+ this.map =
+ new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics);
this.enablePerfMetrics = enablePerfMetrics;
// Initialize the buffer for aggregation value
@@ -256,7 +252,7 @@ public final class UnsafeFixedWidthAggregationMap {
public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException {
UnsafeKVExternalSorter sorter = new UnsafeKVExternalSorter(
groupingKeySchema, aggregationBufferSchema,
- SparkEnv.get().blockManager(), map.getShuffleMemoryManager(), map.getPageSizeBytes(), map);
+ SparkEnv.get().blockManager(), map.getPageSizeBytes(), map);
return sorter;
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 9df5780e4f..46301f0042 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -24,7 +24,6 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.TaskContext;
-import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering;
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
@@ -34,7 +33,7 @@ import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.collection.unsafe.sort.*;
/**
@@ -50,14 +49,19 @@ public final class UnsafeKVExternalSorter {
private final UnsafeExternalRowSorter.PrefixComputer prefixComputer;
private final UnsafeExternalSorter sorter;
- public UnsafeKVExternalSorter(StructType keySchema, StructType valueSchema,
- BlockManager blockManager, ShuffleMemoryManager shuffleMemoryManager, long pageSizeBytes)
- throws IOException {
- this(keySchema, valueSchema, blockManager, shuffleMemoryManager, pageSizeBytes, null);
+ public UnsafeKVExternalSorter(
+ StructType keySchema,
+ StructType valueSchema,
+ BlockManager blockManager,
+ long pageSizeBytes) throws IOException {
+ this(keySchema, valueSchema, blockManager, pageSizeBytes, null);
}
- public UnsafeKVExternalSorter(StructType keySchema, StructType valueSchema,
- BlockManager blockManager, ShuffleMemoryManager shuffleMemoryManager, long pageSizeBytes,
+ public UnsafeKVExternalSorter(
+ StructType keySchema,
+ StructType valueSchema,
+ BlockManager blockManager,
+ long pageSizeBytes,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
@@ -73,7 +77,6 @@ public final class UnsafeKVExternalSorter {
if (map == null) {
sorter = UnsafeExternalSorter.create(
taskMemoryManager,
- shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,
@@ -115,7 +118,6 @@ public final class UnsafeKVExternalSorter {
sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
taskContext.taskMemoryManager(),
- shuffleMemoryManager,
blockManager,
taskContext,
new KVComparator(ordering, keySchema.length()),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 7cd0f7b81e..fb2fc98e34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.unsafe.KVIterator
-import org.apache.spark.{InternalAccumulator, Logging, SparkEnv, TaskContext}
+import org.apache.spark.{InternalAccumulator, Logging, TaskContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StructType
*
* This iterator first uses hash-based aggregation to process input rows. It uses
* a hash map to store groups and their corresponding aggregation buffers. If we
- * this map cannot allocate memory from [[org.apache.spark.shuffle.ShuffleMemoryManager]],
+ * this map cannot allocate memory from memory manager,
* it switches to sort-based aggregation. The process of the switch has the following step:
* - Step 1: Sort all entries of the hash map based on values of grouping expressions and
* spill them to disk.
@@ -480,10 +480,9 @@ class TungstenAggregationIterator(
initialAggregationBuffer,
StructType.fromAttributes(allAggregateFunctions.flatMap(_.aggBufferAttributes)),
StructType.fromAttributes(groupingExpressions.map(_.toAttribute)),
- TaskContext.get.taskMemoryManager(),
- SparkEnv.get.shuffleMemoryManager,
+ TaskContext.get().taskMemoryManager(),
1024 * 16, // initial capacity
- SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
false // disable tracking of performance metrics
)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index cfd64c1d9e..1b59b19d94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -344,8 +344,7 @@ private[sql] class DynamicPartitionWriterContainer(
StructType.fromAttributes(partitionColumns),
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
- SparkEnv.get.shuffleMemoryManager,
- SparkEnv.get.shuffleMemoryManager.pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes)
sorter.insertKV(currentKey, getOutputRow(inputRow))
}
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index bc255b2750..cc8abb1ba4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -21,7 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.nio.ByteOrder
import java.util.{HashMap => JavaHashMap}
-import org.apache.spark.shuffle.ShuffleMemoryManager
+import org.apache.spark.memory.{TaskMemoryManager, StaticMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.local.LocalNode
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
-import org.apache.spark.unsafe.memory.{MemoryLocation, ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
+import org.apache.spark.unsafe.memory.MemoryLocation
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.{SparkConf, SparkEnv}
@@ -320,21 +320,20 @@ private[joins] final class UnsafeHashedRelation(
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val nKeys = in.readInt()
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
- val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
+ // TODO(josh): This needs to be revisited before we merge this patch; making this change now
+ // so that tests compile:
+ val taskMemoryManager = new TaskMemoryManager(
+ new StaticMemoryManager(
+ new SparkConf().set("spark.unsafe.offHeap", "false"), Long.MaxValue, Long.MaxValue, 1), 0)
- val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
+ val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
- // Dummy shuffle memory manager which always grants all memory allocation requests.
- // We use this because it doesn't make sense count shared broadcast variables' memory usage
- // towards individual tasks' quotas. In the future, we should devise a better way of handling
- // this.
- val shuffleMemoryManager =
- ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes)
+ // TODO(josh): We won't need this dummy memory manager after future refactorings; revisit
+ // during code review
binaryMap = new BytesToBytesMap(
taskMemoryManager,
- shuffleMemoryManager,
(nKeys * 1.5 + 1).toInt, // reduce hash collision
pageSizeBytes)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 9385e5734d..dd92dda480 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -49,7 +49,8 @@ case class Sort(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
- val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
+ val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
+ TaskContext.get(), ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r.copy(), null)))
val baseIterator = sorter.iterator.map(_._1)
val context = TaskContext.get()
@@ -124,7 +125,7 @@ case class TungstenSort(
}
}
- val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
+ val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
val sorter = new UnsafeExternalRowSorter(
schema, ordering, prefixComparator, prefixComputer, pageSize)
if (testSpillFrequency > 0) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
deleted file mode 100644
index c4358f409b..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import scala.collection.mutable
-
-import org.apache.spark.memory.MemoryManager
-import org.apache.spark.shuffle.ShuffleMemoryManager
-import org.apache.spark.storage.{BlockId, BlockStatus}
-
-
-/**
- * A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
- */
-class TestShuffleMemoryManager
- extends ShuffleMemoryManager(new GrantEverythingMemoryManager, 4 * 1024 * 1024) {
- private var oom = false
-
- override def tryToAcquire(numBytes: Long): Long = {
- if (oom) {
- oom = false
- 0
- } else {
- // Uncomment the following to trace memory allocations.
- // println(s"tryToAcquire $numBytes in " +
- // Thread.currentThread().getStackTrace.mkString("", "\n -", ""))
- val acquired = super.tryToAcquire(numBytes)
- acquired
- }
- }
-
- override def release(numBytes: Long): Unit = {
- // Uncomment the following to trace memory releases.
- // println(s"release $numBytes in " +
- // Thread.currentThread().getStackTrace.mkString("", "\n -", ""))
- super.release(numBytes)
- }
-
- def markAsOutOfMemory(): Unit = {
- oom = true
- }
-}
-
-private class GrantEverythingMemoryManager extends MemoryManager {
- override def acquireExecutionMemory(
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def acquireUnrollMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def releaseExecutionMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(numBytes: Long): Unit = { }
- override def maxExecutionMemory: Long = Long.MaxValue
- override def maxStorageMemory: Long = Long.MaxValue
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 1739798a24..dbf4863b76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -23,13 +23,12 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
-import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite}
-import org.apache.spark.shuffle.ShuffleMemoryManager
+import org.apache.spark.{SparkConf, TaskContextImpl, TaskContext, SparkFunSuite}
+import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -49,23 +48,22 @@ class UnsafeFixedWidthAggregationMapSuite
private def emptyAggregationBuffer: InternalRow = InternalRow(0)
private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes
+ private var memoryManager: GrantEverythingMemoryManager = null
private var taskMemoryManager: TaskMemoryManager = null
- private var shuffleMemoryManager: TestShuffleMemoryManager = null
def testWithMemoryLeakDetection(name: String)(f: => Unit) {
def cleanup(): Unit = {
if (taskMemoryManager != null) {
- val leakedShuffleMemory = shuffleMemoryManager.getMemoryConsumptionForThisTask()
assert(taskMemoryManager.cleanUpAllAllocatedMemory() === 0)
- assert(leakedShuffleMemory === 0)
taskMemoryManager = null
}
TaskContext.unset()
}
test(name) {
- taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
- shuffleMemoryManager = new TestShuffleMemoryManager
+ val conf = new SparkConf().set("spark.unsafe.offHeap", "false")
+ memoryManager = new GrantEverythingMemoryManager(conf)
+ taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
@@ -110,7 +108,6 @@ class UnsafeFixedWidthAggregationMapSuite
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
- shuffleMemoryManager,
1024, // initial capacity,
PAGE_SIZE_BYTES,
false // disable perf metrics
@@ -125,7 +122,6 @@ class UnsafeFixedWidthAggregationMapSuite
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
- shuffleMemoryManager,
1024, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
@@ -153,7 +149,6 @@ class UnsafeFixedWidthAggregationMapSuite
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
- shuffleMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
@@ -176,14 +171,13 @@ class UnsafeFixedWidthAggregationMapSuite
testWithMemoryLeakDetection("test external sorting") {
// Memory consumption in the beginning of the task.
- val initialMemoryConsumption = shuffleMemoryManager.getMemoryConsumptionForThisTask()
+ val initialMemoryConsumption = taskMemoryManager.getMemoryConsumptionForThisTask()
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
- shuffleMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
@@ -200,7 +194,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()
withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
+ assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}
// Add more keys to the sorter and make sure the results come out sorted.
@@ -214,7 +208,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v))
if ((i % 100) == 0) {
- shuffleMemoryManager.markAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemory()
sorter.closeCurrentPage()
}
}
@@ -238,7 +232,6 @@ class UnsafeFixedWidthAggregationMapSuite
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
- shuffleMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
@@ -258,7 +251,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v))
if ((i % 100) == 0) {
- shuffleMemoryManager.markAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemory()
sorter.closeCurrentPage()
}
}
@@ -281,14 +274,13 @@ class UnsafeFixedWidthAggregationMapSuite
testWithMemoryLeakDetection("test external sorting with empty records") {
// Memory consumption in the beginning of the task.
- val initialMemoryConsumption = shuffleMemoryManager.getMemoryConsumptionForThisTask()
+ val initialMemoryConsumption = taskMemoryManager.getMemoryConsumptionForThisTask()
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
StructType(Nil),
StructType(Nil),
taskMemoryManager,
- shuffleMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
@@ -303,7 +295,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()
withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
+ assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}
// Add more keys to the sorter and make sure the results come out sorted.
@@ -311,7 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(UnsafeRow.createFromByteArray(0, 0), UnsafeRow.createFromByteArray(0, 0))
if ((i % 100) == 0) {
- shuffleMemoryManager.markAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemory()
sorter.closeCurrentPage()
}
}
@@ -332,34 +324,28 @@ class UnsafeFixedWidthAggregationMapSuite
}
testWithMemoryLeakDetection("convert to external sorter under memory pressure (SPARK-10474)") {
- val smm = ShuffleMemoryManager.createForTesting(65536)
val pageSize = 4096
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
- smm,
128, // initial capacity
pageSize,
false // disable perf metrics
)
- // Insert into the map until we've run out of space
val rand = new Random(42)
- var hasSpace = true
- while (hasSpace) {
+ for (i <- 1 to 100) {
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
- if (buf == null) {
- hasSpace = false
- } else {
- buf.setInt(0, str.length)
- }
+ buf.setInt(0, str.length)
}
-
- // Ensure we're actually maxed out by asserting that we can't acquire even just 1 byte
- assert(smm.tryToAcquire(1) === 0)
+ // Simulate running out of space
+ memoryManager.markExecutionAsOutOfMemory()
+ val str = rand.nextString(1024)
+ val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ assert(buf == null)
// Convert the map into a sorter. This used to fail before the fix for SPARK-10474
// because we would try to acquire space for the in-memory sorter pointer array before
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index d3be568a87..13dc1754c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -20,12 +20,12 @@ package org.apache.spark.sql.execution
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
@@ -108,9 +108,9 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
inputData: Seq[(InternalRow, InternalRow)],
pageSize: Long,
spill: Boolean): Unit = {
-
- val taskMemMgr = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
- val shuffleMemMgr = new TestShuffleMemoryManager
+ val memoryManager =
+ new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
+ val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
partitionId = 0,
@@ -121,14 +121,14 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
internalAccumulators = Seq.empty))
val sorter = new UnsafeKVExternalSorter(
- keySchema, valueSchema, SparkEnv.get.blockManager, shuffleMemMgr, pageSize)
+ keySchema, valueSchema, SparkEnv.get.blockManager, pageSize)
// Insert the keys and values into the sorter
inputData.foreach { case (k, v) =>
sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow])
// 1% chance we will spill
if (rand.nextDouble() < 0.01 && spill) {
- shuffleMemMgr.markAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemory()
sorter.closeCurrentPage()
}
}
@@ -170,12 +170,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
assert(out.sorted(kvOrdering) === inputData.sorted(kvOrdering))
// Make sure there is no memory leak
- val leakedUnsafeMemory: Long = taskMemMgr.cleanUpAllAllocatedMemory
- if (shuffleMemMgr != null) {
- val leakedShuffleMemory: Long = shuffleMemMgr.getMemoryConsumptionForThisTask()
- assert(0L === leakedShuffleMemory)
- }
- assert(0 === leakedUnsafeMemory)
+ assert(0 === taskMemMgr.cleanUpAllAllocatedMemory)
TaskContext.unset()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 1680d7e0a8..d32572b54b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import java.io.{File, ByteArrayInputStream, ByteArrayOutputStream}
import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
@@ -112,7 +113,12 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
val data = (1 to 10000).iterator.map { i =>
(i, converter(Row(i)))
}
+ val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
+ val taskContext = new TaskContextImpl(
+ 0, 0, 0, 0, taskMemoryManager, null, InternalAccumulator.create(sc))
+
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
+ taskContext,
partitioner = Some(new HashPartitioner(10)),
serializer = Some(new UnsafeRowSerializer(numFields = 1)))
@@ -122,10 +128,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
assert(sorter.numSpills > 0)
// Merging spilled files should not throw assertion error
- val taskContext =
- new TaskContextImpl(0, 0, 0, 0, null, null, InternalAccumulator.create(sc))
taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
- sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, outputFile)
+ sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
} {
// Clean up
if (sc != null) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
index cc0ac1b07c..475037bd45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
@@ -18,16 +18,16 @@
package org.apache.spark.sql.execution.aggregate
import org.apache.spark._
+import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.unsafe.memory.TaskMemoryManager
class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext {
test("memory acquired on construction") {
- val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.executorMemoryManager)
+ val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.memoryManager, 0)
val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null, Seq.empty)
TaskContext.setTaskContext(taskContext)