aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-18 23:58:25 -0700
committerAndrew Or <andrew@databricks.com>2015-09-18 23:58:25 -0700
commit7ff8d68cc19299e16dedfd819b9e96480fa6cf44 (patch)
treebf4851837bc99d35cdc6ac2c00ad46511835affb /sql
parent22be2ae147a111e88896f6fb42ed46bbf108a99b (diff)
downloadspark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.tar.gz
spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.tar.bz2
spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.zip
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array
When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception: ``` java.io.IOException: Could not acquire 65536 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) ``` Author: Andrew Or <andrew@databricks.com> Closes #8827 from andrewor14/allocate-pointer-array.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala49
2 files changed, 54 insertions, 3 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 7db6b7ff50..b81f67a16b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,6 +85,7 @@ public final class UnsafeKVExternalSorter {
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
+ // TODO: track pointer array memory used by this in-memory sorter!
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
@@ -123,8 +124,13 @@ public final class UnsafeKVExternalSorter {
pageSizeBytes,
inMemSorter);
- sorter.spill();
+ // Note: This spill doesn't actually release any memory, so if we try to allocate a new
+ // pointer array immediately after the spill then we may fail to acquire sufficient space
+ // for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after*
+ // we have actually freed memory from our map.
+ sorter.spill(false /* initialize for writing */);
map.free();
+ sorter.initializeForWriting();
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index d1f0b2b1fc..ada4d42f99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -23,9 +23,10 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite}
+import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
@@ -325,7 +326,7 @@ class UnsafeFixedWidthAggregationMapSuite
// At here, we also test if copy is correct.
iter.getKey.copy()
iter.getValue.copy()
- count += 1;
+ count += 1
}
// 1 record was from the map and 4096 records were explicitly inserted.
@@ -333,4 +334,48 @@ class UnsafeFixedWidthAggregationMapSuite
map.free()
}
+
+ testWithMemoryLeakDetection("convert to external sorter under memory pressure (SPARK-10474)") {
+ val smm = ShuffleMemoryManager.createForTesting(65536)
+ val pageSize = 4096
+ val map = new UnsafeFixedWidthAggregationMap(
+ emptyAggregationBuffer,
+ aggBufferSchema,
+ groupKeySchema,
+ taskMemoryManager,
+ smm,
+ 128, // initial capacity
+ pageSize,
+ false // disable perf metrics
+ )
+
+ // Insert into the map until we've run out of space
+ val rand = new Random(42)
+ var hasSpace = true
+ while (hasSpace) {
+ val str = rand.nextString(1024)
+ val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ if (buf == null) {
+ hasSpace = false
+ } else {
+ buf.setInt(0, str.length)
+ }
+ }
+
+ // Ensure we're actually maxed out by asserting that we can't acquire even just 1 byte
+ assert(smm.tryToAcquire(1) === 0)
+
+ // Convert the map into a sorter. This used to fail before the fix for SPARK-10474
+ // because we would try to acquire space for the in-memory sorter pointer array before
+ // actually releasing the pages despite having spilled all of them.
+ var sorter: UnsafeKVExternalSorter = null
+ try {
+ sorter = map.destructAndCreateExternalSorter()
+ } finally {
+ if (sorter != null) {
+ sorter.cleanupResources()
+ }
+ }
+ }
+
}