diff options
Diffstat (limited to 'sql/core')
5 files changed, 20 insertions, 83 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index 82c645df28..889f970034 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -165,7 +165,7 @@ public final class UnsafeFixedWidthAggregationMap { public KVIterator<UnsafeRow, UnsafeRow> iterator() { return new KVIterator<UnsafeRow, UnsafeRow>() { - private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator = + private final BytesToBytesMap.MapIterator mapLocationIterator = map.destructiveIterator(); private final UnsafeRow key = new UnsafeRow(); private final UnsafeRow value = new UnsafeRow(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 46301f0042..845f2ae685 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -17,13 +17,13 @@ package org.apache.spark.sql.execution; -import java.io.IOException; - import javax.annotation.Nullable; +import java.io.IOException; import com.google.common.annotations.VisibleForTesting; import org.apache.spark.TaskContext; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering; import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering; @@ -33,7 +33,6 @@ import org.apache.spark.unsafe.KVIterator; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.map.BytesToBytesMap; import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.util.collection.unsafe.sort.*; /** @@ -84,18 +83,16 @@ public final class UnsafeKVExternalSorter { /* initialSize */ 4096, pageSizeBytes); } else { - // Insert the records into the in-memory sorter. - // We will use the number of elements in the map as the initialSize of the - // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize, - // we will use 1 as its initial size if the map is empty. - // TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474) + // The memory needed for UnsafeInMemorySorter should be less than longArray in map. + map.freeArray(); + // The memory used by UnsafeInMemorySorter will be counted later (end of this block) final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter( taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements())); // We cannot use the destructive iterator here because we are reusing the existing memory // pages in BytesToBytesMap to hold records during sorting. // The only new memory we are allocating is the pointer/prefix array. - BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator(); + BytesToBytesMap.MapIterator iter = map.iterator(); final int numKeyFields = keySchema.size(); UnsafeRow row = new UnsafeRow(); while (iter.hasNext()) { @@ -117,7 +114,7 @@ public final class UnsafeKVExternalSorter { } sorter = UnsafeExternalSorter.createWithExistingInMemorySorter( - taskContext.taskMemoryManager(), + taskMemoryManager, blockManager, taskContext, new KVComparator(ordering, keySchema.length()), @@ -128,6 +125,8 @@ public final class UnsafeKVExternalSorter { sorter.spill(); map.free(); + // counting the memory used UnsafeInMemorySorter + taskMemoryManager.acquireExecutionMemory(inMemSorter.getMemoryUsage(), sorter); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index dbf4863b76..a38623623a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -24,7 +24,7 @@ import scala.util.{Try, Random} import org.scalatest.Matchers import org.apache.spark.{SparkConf, TaskContextImpl, TaskContext, SparkFunSuite} -import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager} +import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection} import org.apache.spark.sql.test.SharedSQLContext @@ -48,7 +48,7 @@ class UnsafeFixedWidthAggregationMapSuite private def emptyAggregationBuffer: InternalRow = InternalRow(0) private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes - private var memoryManager: GrantEverythingMemoryManager = null + private var memoryManager: TestMemoryManager = null private var taskMemoryManager: TaskMemoryManager = null def testWithMemoryLeakDetection(name: String)(f: => Unit) { @@ -62,7 +62,7 @@ class UnsafeFixedWidthAggregationMapSuite test(name) { val conf = new SparkConf().set("spark.unsafe.offHeap", "false") - memoryManager = new GrantEverythingMemoryManager(conf) + memoryManager = new TestMemoryManager(conf) taskMemoryManager = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( @@ -193,10 +193,6 @@ class UnsafeFixedWidthAggregationMapSuite // Convert the map into a sorter val sorter = map.destructAndCreateExternalSorter() - withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) - } - // Add more keys to the sorter and make sure the results come out sorted. val additionalKeys = randomStrings(1024) val keyConverter = UnsafeProjection.create(groupKeySchema) @@ -208,7 +204,7 @@ class UnsafeFixedWidthAggregationMapSuite sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v)) if ((i % 100) == 0) { - memoryManager.markExecutionAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemoryOnce() sorter.closeCurrentPage() } } @@ -251,7 +247,7 @@ class UnsafeFixedWidthAggregationMapSuite sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v)) if ((i % 100) == 0) { - memoryManager.markExecutionAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemoryOnce() sorter.closeCurrentPage() } } @@ -294,16 +290,12 @@ class UnsafeFixedWidthAggregationMapSuite // Convert the map into a sorter. Right now, it contains one record. val sorter = map.destructAndCreateExternalSorter() - withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) - } - // Add more keys to the sorter and make sure the results come out sorted. (1 to 4096).foreach { i => sorter.insertKV(UnsafeRow.createFromByteArray(0, 0), UnsafeRow.createFromByteArray(0, 0)) if ((i % 100) == 0) { - memoryManager.markExecutionAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemoryOnce() sorter.closeCurrentPage() } } @@ -342,7 +334,7 @@ class UnsafeFixedWidthAggregationMapSuite buf.setInt(0, str.length) } // Simulate running out of space - memoryManager.markExecutionAsOutOfMemory() + memoryManager.limit(0) val str = rand.nextString(1024) val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) assert(buf == null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 13dc1754c9..7b80963ec8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import scala.util.Random import org.apache.spark._ -import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager} +import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection} @@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { pageSize: Long, spill: Boolean): Unit = { val memoryManager = - new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")) + new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")) val taskMemMgr = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( stageId = 0, @@ -128,7 +128,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow]) // 1% chance we will spill if (rand.nextDouble() < 0.01 && spill) { - memoryManager.markExecutionAsOutOfMemory() + memoryManager.markExecutionAsOutOfMemoryOnce() sorter.closeCurrentPage() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala deleted file mode 100644 index 475037bd45..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.aggregate - -import org.apache.spark._ -import org.apache.spark.memory.TaskMemoryManager -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection -import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.test.SharedSQLContext - -class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext { - - test("memory acquired on construction") { - val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.memoryManager, 0) - val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null, Seq.empty) - TaskContext.setTaskContext(taskContext) - - // Assert that a page is allocated before processing starts - var iter: TungstenAggregationIterator = null - try { - val newMutableProjection = (expr: Seq[Expression], schema: Seq[Attribute]) => { - () => new InterpretedMutableProjection(expr, schema) - } - val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy") - iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, Seq.empty, Seq.empty, - 0, Seq.empty, newMutableProjection, Seq.empty, None, - dummyAccum, dummyAccum, dummyAccum, dummyAccum) - val numPages = iter.getHashMap.getNumDataPages - assert(numPages === 1) - } finally { - // Clean up - if (iter != null) { - iter.free() - } - TaskContext.unset() - } - } -} |