diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-10-25 21:19:52 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-10-25 21:19:52 -0700 |
commit | 85e654c5ec87e666a8845bfd77185c1ea57b268a (patch) | |
tree | 2beadbc8fbb54369325970a4e2c7189506efad89 /sql/core | |
parent | 63accc79625d8a03d0624717af5e1d81b18a6da3 (diff) | |
download | spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.tar.gz spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.tar.bz2 spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.zip |
[SPARK-10984] Simplify *MemoryManager class structure
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #9127 from JoshRosen/SPARK-10984.
Diffstat (limited to 'sql/core')
11 files changed, 70 insertions, 164 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index 09511ff35f..82c645df28 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -22,7 +22,6 @@ import java.io.IOException; import com.google.common.annotations.VisibleForTesting; import org.apache.spark.SparkEnv; -import org.apache.spark.shuffle.ShuffleMemoryManager; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; @@ -32,7 +31,7 @@ import org.apache.spark.unsafe.KVIterator; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.map.BytesToBytesMap; import org.apache.spark.unsafe.memory.MemoryLocation; -import org.apache.spark.unsafe.memory.TaskMemoryManager; +import org.apache.spark.memory.TaskMemoryManager; /** * Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width. @@ -88,8 +87,6 @@ public final class UnsafeFixedWidthAggregationMap { * @param aggregationBufferSchema the schema of the aggregation buffer, used for row conversion. * @param groupingKeySchema the schema of the grouping key, used for row conversion. * @param taskMemoryManager the memory manager used to allocate our Unsafe memory structures. - * @param shuffleMemoryManager the shuffle memory manager, for coordinating our memory usage with - * other tasks. * @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing). * @param pageSizeBytes the data page size, in bytes; limits the maximum record size. * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact) @@ -99,15 +96,14 @@ public final class UnsafeFixedWidthAggregationMap { StructType aggregationBufferSchema, StructType groupingKeySchema, TaskMemoryManager taskMemoryManager, - ShuffleMemoryManager shuffleMemoryManager, int initialCapacity, long pageSizeBytes, boolean enablePerfMetrics) { this.aggregationBufferSchema = aggregationBufferSchema; this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema); this.groupingKeySchema = groupingKeySchema; - this.map = new BytesToBytesMap( - taskMemoryManager, shuffleMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics); + this.map = + new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics); this.enablePerfMetrics = enablePerfMetrics; // Initialize the buffer for aggregation value @@ -256,7 +252,7 @@ public final class UnsafeFixedWidthAggregationMap { public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException { UnsafeKVExternalSorter sorter = new UnsafeKVExternalSorter( groupingKeySchema, aggregationBufferSchema, - SparkEnv.get().blockManager(), map.getShuffleMemoryManager(), map.getPageSizeBytes(), map); + SparkEnv.get().blockManager(), map.getPageSizeBytes(), map); return sorter; } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 9df5780e4f..46301f0042 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -24,7 +24,6 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import org.apache.spark.TaskContext; -import org.apache.spark.shuffle.ShuffleMemoryManager; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering; import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering; @@ -34,7 +33,7 @@ import org.apache.spark.unsafe.KVIterator; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.map.BytesToBytesMap; import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.unsafe.memory.TaskMemoryManager; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.util.collection.unsafe.sort.*; /** @@ -50,14 +49,19 @@ public final class UnsafeKVExternalSorter { private final UnsafeExternalRowSorter.PrefixComputer prefixComputer; private final UnsafeExternalSorter sorter; - public UnsafeKVExternalSorter(StructType keySchema, StructType valueSchema, - BlockManager blockManager, ShuffleMemoryManager shuffleMemoryManager, long pageSizeBytes) - throws IOException { - this(keySchema, valueSchema, blockManager, shuffleMemoryManager, pageSizeBytes, null); + public UnsafeKVExternalSorter( + StructType keySchema, + StructType valueSchema, + BlockManager blockManager, + long pageSizeBytes) throws IOException { + this(keySchema, valueSchema, blockManager, pageSizeBytes, null); } - public UnsafeKVExternalSorter(StructType keySchema, StructType valueSchema, - BlockManager blockManager, ShuffleMemoryManager shuffleMemoryManager, long pageSizeBytes, + public UnsafeKVExternalSorter( + StructType keySchema, + StructType valueSchema, + BlockManager blockManager, + long pageSizeBytes, @Nullable BytesToBytesMap map) throws IOException { this.keySchema = keySchema; this.valueSchema = valueSchema; @@ -73,7 +77,6 @@ public final class UnsafeKVExternalSorter { if (map == null) { sorter = UnsafeExternalSorter.create( taskMemoryManager, - shuffleMemoryManager, blockManager, taskContext, recordComparator, @@ -115,7 +118,6 @@ public final class UnsafeKVExternalSorter { sorter = UnsafeExternalSorter.createWithExistingInMemorySorter( taskContext.taskMemoryManager(), - shuffleMemoryManager, blockManager, taskContext, new KVComparator(ordering, keySchema.length()), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 7cd0f7b81e..fb2fc98e34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate import scala.collection.mutable.ArrayBuffer import org.apache.spark.unsafe.KVIterator -import org.apache.spark.{InternalAccumulator, Logging, SparkEnv, TaskContext} +import org.apache.spark.{InternalAccumulator, Logging, TaskContext} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StructType * * This iterator first uses hash-based aggregation to process input rows. It uses * a hash map to store groups and their corresponding aggregation buffers. If we - * this map cannot allocate memory from [[org.apache.spark.shuffle.ShuffleMemoryManager]], + * this map cannot allocate memory from memory manager, * it switches to sort-based aggregation. The process of the switch has the following step: * - Step 1: Sort all entries of the hash map based on values of grouping expressions and * spill them to disk. @@ -480,10 +480,9 @@ class TungstenAggregationIterator( initialAggregationBuffer, StructType.fromAttributes(allAggregateFunctions.flatMap(_.aggBufferAttributes)), StructType.fromAttributes(groupingExpressions.map(_.toAttribute)), - TaskContext.get.taskMemoryManager(), - SparkEnv.get.shuffleMemoryManager, + TaskContext.get().taskMemoryManager(), 1024 * 16, // initial capacity - SparkEnv.get.shuffleMemoryManager.pageSizeBytes, + TaskContext.get().taskMemoryManager().pageSizeBytes, false // disable tracking of performance metrics ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index cfd64c1d9e..1b59b19d94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -344,8 +344,7 @@ private[sql] class DynamicPartitionWriterContainer( StructType.fromAttributes(partitionColumns), StructType.fromAttributes(dataColumns), SparkEnv.get.blockManager, - SparkEnv.get.shuffleMemoryManager, - SparkEnv.get.shuffleMemoryManager.pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes) sorter.insertKV(currentKey, getOutputRow(inputRow)) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index bc255b2750..cc8abb1ba4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -21,7 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.nio.ByteOrder import java.util.{HashMap => JavaHashMap} -import org.apache.spark.shuffle.ShuffleMemoryManager +import org.apache.spark.memory.{TaskMemoryManager, StaticMemoryManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.local.LocalNode import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.map.BytesToBytesMap -import org.apache.spark.unsafe.memory.{MemoryLocation, ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager} +import org.apache.spark.unsafe.memory.MemoryLocation import org.apache.spark.util.Utils import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.{SparkConf, SparkEnv} @@ -320,21 +320,20 @@ private[joins] final class UnsafeHashedRelation( override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { val nKeys = in.readInt() // This is used in Broadcast, shared by multiple tasks, so we use on-heap memory - val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) + // TODO(josh): This needs to be revisited before we merge this patch; making this change now + // so that tests compile: + val taskMemoryManager = new TaskMemoryManager( + new StaticMemoryManager( + new SparkConf().set("spark.unsafe.offHeap", "false"), Long.MaxValue, Long.MaxValue, 1), 0) - val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes) + val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m")) - // Dummy shuffle memory manager which always grants all memory allocation requests. - // We use this because it doesn't make sense count shared broadcast variables' memory usage - // towards individual tasks' quotas. In the future, we should devise a better way of handling - // this. - val shuffleMemoryManager = - ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes) + // TODO(josh): We won't need this dummy memory manager after future refactorings; revisit + // during code review binaryMap = new BytesToBytesMap( taskMemoryManager, - shuffleMemoryManager, (nKeys * 1.5 + 1).toInt, // reduce hash collision pageSizeBytes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 9385e5734d..dd92dda480 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -49,7 +49,8 @@ case class Sort( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) - val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering)) + val sorter = new ExternalSorter[InternalRow, Null, InternalRow]( + TaskContext.get(), ordering = Some(ordering)) sorter.insertAll(iterator.map(r => (r.copy(), null))) val baseIterator = sorter.iterator.map(_._1) val context = TaskContext.get() @@ -124,7 +125,7 @@ case class TungstenSort( } } - val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes val sorter = new UnsafeExternalRowSorter( schema, ordering, prefixComparator, prefixComputer, pageSize) if (testSpillFrequency > 0) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala deleted file mode 100644 index c4358f409b..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import scala.collection.mutable - -import org.apache.spark.memory.MemoryManager -import org.apache.spark.shuffle.ShuffleMemoryManager -import org.apache.spark.storage.{BlockId, BlockStatus} - - -/** - * A [[ShuffleMemoryManager]] that can be controlled to run out of memory. - */ -class TestShuffleMemoryManager - extends ShuffleMemoryManager(new GrantEverythingMemoryManager, 4 * 1024 * 1024) { - private var oom = false - - override def tryToAcquire(numBytes: Long): Long = { - if (oom) { - oom = false - 0 - } else { - // Uncomment the following to trace memory allocations. - // println(s"tryToAcquire $numBytes in " + - // Thread.currentThread().getStackTrace.mkString("", "\n -", "")) - val acquired = super.tryToAcquire(numBytes) - acquired - } - } - - override def release(numBytes: Long): Unit = { - // Uncomment the following to trace memory releases. - // println(s"release $numBytes in " + - // Thread.currentThread().getStackTrace.mkString("", "\n -", "")) - super.release(numBytes) - } - - def markAsOutOfMemory(): Unit = { - oom = true - } -} - -private class GrantEverythingMemoryManager extends MemoryManager { - override def acquireExecutionMemory( - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def releaseExecutionMemory(numBytes: Long): Unit = { } - override def releaseStorageMemory(numBytes: Long): Unit = { } - override def maxExecutionMemory: Long = Long.MaxValue - override def maxStorageMemory: Long = Long.MaxValue -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index 1739798a24..dbf4863b76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -23,13 +23,12 @@ import scala.util.{Try, Random} import org.scalatest.Matchers -import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite} -import org.apache.spark.shuffle.ShuffleMemoryManager +import org.apache.spark.{SparkConf, TaskContextImpl, TaskContext, SparkFunSuite} +import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager} import org.apache.spark.unsafe.types.UTF8String /** @@ -49,23 +48,22 @@ class UnsafeFixedWidthAggregationMapSuite private def emptyAggregationBuffer: InternalRow = InternalRow(0) private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes + private var memoryManager: GrantEverythingMemoryManager = null private var taskMemoryManager: TaskMemoryManager = null - private var shuffleMemoryManager: TestShuffleMemoryManager = null def testWithMemoryLeakDetection(name: String)(f: => Unit) { def cleanup(): Unit = { if (taskMemoryManager != null) { - val leakedShuffleMemory = shuffleMemoryManager.getMemoryConsumptionForThisTask() assert(taskMemoryManager.cleanUpAllAllocatedMemory() === 0) - assert(leakedShuffleMemory === 0) taskMemoryManager = null } TaskContext.unset() } test(name) { - taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) - shuffleMemoryManager = new TestShuffleMemoryManager + val conf = new SparkConf().set("spark.unsafe.offHeap", "false") + memoryManager = new GrantEverythingMemoryManager(conf) + taskMemoryManager = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( stageId = 0, @@ -110,7 +108,6 @@ class UnsafeFixedWidthAggregationMapSuite aggBufferSchema, groupKeySchema, taskMemoryManager, - shuffleMemoryManager, 1024, // initial capacity, PAGE_SIZE_BYTES, false // disable perf metrics @@ -125,7 +122,6 @@ class UnsafeFixedWidthAggregationMapSuite aggBufferSchema, groupKeySchema, taskMemoryManager, - shuffleMemoryManager, 1024, // initial capacity PAGE_SIZE_BYTES, false // disable perf metrics @@ -153,7 +149,6 @@ class UnsafeFixedWidthAggregationMapSuite aggBufferSchema, groupKeySchema, taskMemoryManager, - shuffleMemoryManager, 128, // initial capacity PAGE_SIZE_BYTES, false // disable perf metrics @@ -176,14 +171,13 @@ class UnsafeFixedWidthAggregationMapSuite testWithMemoryLeakDetection("test external sorting") { // Memory consumption in the beginning of the task. - val initialMemoryConsumption = shuffleMemoryManager.getMemoryConsumptionForThisTask() + val initialMemoryConsumption = taskMemoryManager.getMemoryConsumptionForThisTask() val map = new UnsafeFixedWidthAggregationMap( emptyAggregationBuffer, aggBufferSchema, groupKeySchema, taskMemoryManager, - shuffleMemoryManager, 128, // initial capacity PAGE_SIZE_BYTES, false // disable perf metrics @@ -200,7 +194,7 @@ class UnsafeFixedWidthAggregationMapSuite val sorter = map.destructAndCreateExternalSorter() withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) + assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) } // Add more keys to the sorter and make sure the results come out sorted. @@ -214,7 +208,7 @@ class UnsafeFixedWidthAggregationMapSuite sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v)) if ((i % 100) == 0) { - shuffleMemoryManager.markAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemory() sorter.closeCurrentPage() } } @@ -238,7 +232,6 @@ class UnsafeFixedWidthAggregationMapSuite aggBufferSchema, groupKeySchema, taskMemoryManager, - shuffleMemoryManager, 128, // initial capacity PAGE_SIZE_BYTES, false // disable perf metrics @@ -258,7 +251,7 @@ class UnsafeFixedWidthAggregationMapSuite sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v)) if ((i % 100) == 0) { - shuffleMemoryManager.markAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemory() sorter.closeCurrentPage() } } @@ -281,14 +274,13 @@ class UnsafeFixedWidthAggregationMapSuite testWithMemoryLeakDetection("test external sorting with empty records") { // Memory consumption in the beginning of the task. - val initialMemoryConsumption = shuffleMemoryManager.getMemoryConsumptionForThisTask() + val initialMemoryConsumption = taskMemoryManager.getMemoryConsumptionForThisTask() val map = new UnsafeFixedWidthAggregationMap( emptyAggregationBuffer, StructType(Nil), StructType(Nil), taskMemoryManager, - shuffleMemoryManager, 128, // initial capacity PAGE_SIZE_BYTES, false // disable perf metrics @@ -303,7 +295,7 @@ class UnsafeFixedWidthAggregationMapSuite val sorter = map.destructAndCreateExternalSorter() withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) + assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) } // Add more keys to the sorter and make sure the results come out sorted. @@ -311,7 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite sorter.insertKV(UnsafeRow.createFromByteArray(0, 0), UnsafeRow.createFromByteArray(0, 0)) if ((i % 100) == 0) { - shuffleMemoryManager.markAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemory() sorter.closeCurrentPage() } } @@ -332,34 +324,28 @@ class UnsafeFixedWidthAggregationMapSuite } testWithMemoryLeakDetection("convert to external sorter under memory pressure (SPARK-10474)") { - val smm = ShuffleMemoryManager.createForTesting(65536) val pageSize = 4096 val map = new UnsafeFixedWidthAggregationMap( emptyAggregationBuffer, aggBufferSchema, groupKeySchema, taskMemoryManager, - smm, 128, // initial capacity pageSize, false // disable perf metrics ) - // Insert into the map until we've run out of space val rand = new Random(42) - var hasSpace = true - while (hasSpace) { + for (i <- 1 to 100) { val str = rand.nextString(1024) val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) - if (buf == null) { - hasSpace = false - } else { - buf.setInt(0, str.length) - } + buf.setInt(0, str.length) } - - // Ensure we're actually maxed out by asserting that we can't acquire even just 1 byte - assert(smm.tryToAcquire(1) === 0) + // Simulate running out of space + memoryManager.markExecutionAsOutOfMemory() + val str = rand.nextString(1024) + val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + assert(buf == null) // Convert the map into a sorter. This used to fail before the fix for SPARK-10474 // because we would try to acquire space for the in-memory sorter pointer array before diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index d3be568a87..13dc1754c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.execution import scala.util.Random import org.apache.spark._ +import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager} import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager} /** * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data. @@ -108,9 +108,9 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { inputData: Seq[(InternalRow, InternalRow)], pageSize: Long, spill: Boolean): Unit = { - - val taskMemMgr = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) - val shuffleMemMgr = new TestShuffleMemoryManager + val memoryManager = + new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")) + val taskMemMgr = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( stageId = 0, partitionId = 0, @@ -121,14 +121,14 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { internalAccumulators = Seq.empty)) val sorter = new UnsafeKVExternalSorter( - keySchema, valueSchema, SparkEnv.get.blockManager, shuffleMemMgr, pageSize) + keySchema, valueSchema, SparkEnv.get.blockManager, pageSize) // Insert the keys and values into the sorter inputData.foreach { case (k, v) => sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow]) // 1% chance we will spill if (rand.nextDouble() < 0.01 && spill) { - shuffleMemMgr.markAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemory() sorter.closeCurrentPage() } } @@ -170,12 +170,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { assert(out.sorted(kvOrdering) === inputData.sorted(kvOrdering)) // Make sure there is no memory leak - val leakedUnsafeMemory: Long = taskMemMgr.cleanUpAllAllocatedMemory - if (shuffleMemMgr != null) { - val leakedShuffleMemory: Long = shuffleMemMgr.getMemoryConsumptionForThisTask() - assert(0L === leakedShuffleMemory) - } - assert(0 === leakedUnsafeMemory) + assert(0 === taskMemMgr.cleanUpAllAllocatedMemory) TaskContext.unset() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 1680d7e0a8..d32572b54b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import java.io.{File, ByteArrayInputStream, ByteArrayOutputStream} import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.RDD import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -112,7 +113,12 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val data = (1 to 10000).iterator.map { i => (i, converter(Row(i))) } + val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0) + val taskContext = new TaskContextImpl( + 0, 0, 0, 0, taskMemoryManager, null, InternalAccumulator.create(sc)) + val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow]( + taskContext, partitioner = Some(new HashPartitioner(10)), serializer = Some(new UnsafeRowSerializer(numFields = 1))) @@ -122,10 +128,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { assert(sorter.numSpills > 0) // Merging spilled files should not throw assertion error - val taskContext = - new TaskContextImpl(0, 0, 0, 0, null, null, InternalAccumulator.create(sc)) taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics) - sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, outputFile) + sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile) } { // Clean up if (sc != null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala index cc0ac1b07c..475037bd45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala @@ -18,16 +18,16 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark._ +import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.unsafe.memory.TaskMemoryManager class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext { test("memory acquired on construction") { - val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.executorMemoryManager) + val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.memoryManager, 0) val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null, Seq.empty) TaskContext.setTaskContext(taskContext) |