aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java7
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala54
6 files changed, 21 insertions, 89 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 810c74fd2f..f7063d1e5c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -96,15 +96,10 @@ final class UnsafeExternalRowSorter {
);
numRowsInserted++;
if (testSpillFrequency > 0 && (numRowsInserted % testSpillFrequency) == 0) {
- spill();
+ sorter.spill();
}
}
- @VisibleForTesting
- void spill() throws IOException {
- sorter.spill();
- }
-
/**
* Return the peak memory used so far, in bytes.
*/
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 82c645df28..889f970034 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -165,7 +165,7 @@ public final class UnsafeFixedWidthAggregationMap {
public KVIterator<UnsafeRow, UnsafeRow> iterator() {
return new KVIterator<UnsafeRow, UnsafeRow>() {
- private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator =
+ private final BytesToBytesMap.MapIterator mapLocationIterator =
map.destructiveIterator();
private final UnsafeRow key = new UnsafeRow();
private final UnsafeRow value = new UnsafeRow();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 46301f0042..845f2ae685 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -17,13 +17,13 @@
package org.apache.spark.sql.execution;
-import java.io.IOException;
-
import javax.annotation.Nullable;
+import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.TaskContext;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering;
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
@@ -33,7 +33,6 @@ import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.collection.unsafe.sort.*;
/**
@@ -84,18 +83,16 @@ public final class UnsafeKVExternalSorter {
/* initialSize */ 4096,
pageSizeBytes);
} else {
- // Insert the records into the in-memory sorter.
- // We will use the number of elements in the map as the initialSize of the
- // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
- // we will use 1 as its initial size if the map is empty.
- // TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474)
+ // The memory needed for UnsafeInMemorySorter should be less than longArray in map.
+ map.freeArray();
+ // The memory used by UnsafeInMemorySorter will be counted later (end of this block)
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
// We cannot use the destructive iterator here because we are reusing the existing memory
// pages in BytesToBytesMap to hold records during sorting.
// The only new memory we are allocating is the pointer/prefix array.
- BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator();
+ BytesToBytesMap.MapIterator iter = map.iterator();
final int numKeyFields = keySchema.size();
UnsafeRow row = new UnsafeRow();
while (iter.hasNext()) {
@@ -117,7 +114,7 @@ public final class UnsafeKVExternalSorter {
}
sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
- taskContext.taskMemoryManager(),
+ taskMemoryManager,
blockManager,
taskContext,
new KVComparator(ordering, keySchema.length()),
@@ -128,6 +125,8 @@ public final class UnsafeKVExternalSorter {
sorter.spill();
map.free();
+ // counting the memory used UnsafeInMemorySorter
+ taskMemoryManager.acquireExecutionMemory(inMemSorter.getMemoryUsage(), sorter);
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index dbf4863b76..a38623623a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -24,7 +24,7 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, TaskContextImpl, TaskContext, SparkFunSuite}
-import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
@@ -48,7 +48,7 @@ class UnsafeFixedWidthAggregationMapSuite
private def emptyAggregationBuffer: InternalRow = InternalRow(0)
private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes
- private var memoryManager: GrantEverythingMemoryManager = null
+ private var memoryManager: TestMemoryManager = null
private var taskMemoryManager: TaskMemoryManager = null
def testWithMemoryLeakDetection(name: String)(f: => Unit) {
@@ -62,7 +62,7 @@ class UnsafeFixedWidthAggregationMapSuite
test(name) {
val conf = new SparkConf().set("spark.unsafe.offHeap", "false")
- memoryManager = new GrantEverythingMemoryManager(conf)
+ memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
@@ -193,10 +193,6 @@ class UnsafeFixedWidthAggregationMapSuite
// Convert the map into a sorter
val sorter = map.destructAndCreateExternalSorter()
- withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
- }
-
// Add more keys to the sorter and make sure the results come out sorted.
val additionalKeys = randomStrings(1024)
val keyConverter = UnsafeProjection.create(groupKeySchema)
@@ -208,7 +204,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v))
if ((i % 100) == 0) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
@@ -251,7 +247,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v))
if ((i % 100) == 0) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
@@ -294,16 +290,12 @@ class UnsafeFixedWidthAggregationMapSuite
// Convert the map into a sorter. Right now, it contains one record.
val sorter = map.destructAndCreateExternalSorter()
- withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
- }
-
// Add more keys to the sorter and make sure the results come out sorted.
(1 to 4096).foreach { i =>
sorter.insertKV(UnsafeRow.createFromByteArray(0, 0), UnsafeRow.createFromByteArray(0, 0))
if ((i % 100) == 0) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
@@ -342,7 +334,7 @@ class UnsafeFixedWidthAggregationMapSuite
buf.setInt(0, str.length)
}
// Simulate running out of space
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.limit(0)
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf == null)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 13dc1754c9..7b80963ec8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import scala.util.Random
import org.apache.spark._
-import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection}
@@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
pageSize: Long,
spill: Boolean): Unit = {
val memoryManager =
- new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
+ new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
@@ -128,7 +128,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow])
// 1% chance we will spill
if (rand.nextDouble() < 0.01 && spill) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
deleted file mode 100644
index 475037bd45..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.aggregate
-
-import org.apache.spark._
-import org.apache.spark.memory.TaskMemoryManager
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.test.SharedSQLContext
-
-class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext {
-
- test("memory acquired on construction") {
- val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.memoryManager, 0)
- val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null, Seq.empty)
- TaskContext.setTaskContext(taskContext)
-
- // Assert that a page is allocated before processing starts
- var iter: TungstenAggregationIterator = null
- try {
- val newMutableProjection = (expr: Seq[Expression], schema: Seq[Attribute]) => {
- () => new InterpretedMutableProjection(expr, schema)
- }
- val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy")
- iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, Seq.empty, Seq.empty,
- 0, Seq.empty, newMutableProjection, Seq.empty, None,
- dummyAccum, dummyAccum, dummyAccum, dummyAccum)
- val numPages = iter.getHashMap.getNumDataPages
- assert(numPages === 1)
- } finally {
- // Clean up
- if (iter != null) {
- iter.free()
- }
- TaskContext.unset()
- }
- }
-}