aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java14
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala49
3 files changed, 66 insertions, 5 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index fc364e0a89..14b6aafdea 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -159,7 +159,7 @@ public final class UnsafeExternalSorter {
/**
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
- private void initializeForWriting() throws IOException {
+ public void initializeForWriting() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
final long pointerArrayMemory =
UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
@@ -187,6 +187,14 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
+ spill(true);
+ }
+
+ /**
+ * Sort and spill the current records in response to memory pressure.
+ * @param shouldInitializeForWriting whether to allocate memory for writing after the spill
+ */
+ public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@@ -217,7 +225,9 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
- initializeForWriting();
+ if (shouldInitializeForWriting) {
+ initializeForWriting();
+ }
}
/**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 7db6b7ff50..b81f67a16b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,6 +85,7 @@ public final class UnsafeKVExternalSorter {
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
+ // TODO: track pointer array memory used by this in-memory sorter!
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
@@ -123,8 +124,13 @@ public final class UnsafeKVExternalSorter {
pageSizeBytes,
inMemSorter);
- sorter.spill();
+ // Note: This spill doesn't actually release any memory, so if we try to allocate a new
+ // pointer array immediately after the spill then we may fail to acquire sufficient space
+ // for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after*
+ // we have actually freed memory from our map.
+ sorter.spill(false /* initialize for writing */);
map.free();
+ sorter.initializeForWriting();
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index d1f0b2b1fc..ada4d42f99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -23,9 +23,10 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite}
+import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
@@ -325,7 +326,7 @@ class UnsafeFixedWidthAggregationMapSuite
// At here, we also test if copy is correct.
iter.getKey.copy()
iter.getValue.copy()
- count += 1;
+ count += 1
}
// 1 record was from the map and 4096 records were explicitly inserted.
@@ -333,4 +334,48 @@ class UnsafeFixedWidthAggregationMapSuite
map.free()
}
+
+ testWithMemoryLeakDetection("convert to external sorter under memory pressure (SPARK-10474)") {
+ val smm = ShuffleMemoryManager.createForTesting(65536)
+ val pageSize = 4096
+ val map = new UnsafeFixedWidthAggregationMap(
+ emptyAggregationBuffer,
+ aggBufferSchema,
+ groupKeySchema,
+ taskMemoryManager,
+ smm,
+ 128, // initial capacity
+ pageSize,
+ false // disable perf metrics
+ )
+
+ // Insert into the map until we've run out of space
+ val rand = new Random(42)
+ var hasSpace = true
+ while (hasSpace) {
+ val str = rand.nextString(1024)
+ val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ if (buf == null) {
+ hasSpace = false
+ } else {
+ buf.setInt(0, str.length)
+ }
+ }
+
+ // Ensure we're actually maxed out by asserting that we can't acquire even just 1 byte
+ assert(smm.tryToAcquire(1) === 0)
+
+ // Convert the map into a sorter. This used to fail before the fix for SPARK-10474
+ // because we would try to acquire space for the in-memory sorter pointer array before
+ // actually releasing the pages despite having spilled all of them.
+ var sorter: UnsafeKVExternalSorter = null
+ try {
+ sorter = map.destructAndCreateExternalSorter()
+ } finally {
+ if (sorter != null) {
+ sorter.cleanupResources()
+ }
+ }
+ }
+
}