aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-23 19:34:31 -0700
committerAndrew Or <andrew@databricks.com>2015-09-23 19:34:31 -0700
commit83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 (patch)
tree2267010df342a512c68a11ee63f5dd2bf655d7f0 /sql
parent084e4e126211d74a79e8dbd2d0e604dd3c650822 (diff)
downloadspark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.tar.gz
spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.tar.bz2
spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.zip
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array (round 2)
This patch reverts most of the changes in a previous fix #8827. The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](https://github.com/apache/spark/blob/a18208047f06a4244703c17023bb20cbe1f59d73/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java#L88)) Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff. Author: Andrew Or <andrew@databricks.com> Closes #8888 from andrewor14/dont-track-pointer-array.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala8
2 files changed, 4 insertions, 13 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b81f67a16b..9df5780e4f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,7 +85,7 @@ public final class UnsafeKVExternalSorter {
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
- // TODO: track pointer array memory used by this in-memory sorter!
+ // TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474)
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
@@ -124,13 +124,8 @@ public final class UnsafeKVExternalSorter {
pageSizeBytes,
inMemSorter);
- // Note: This spill doesn't actually release any memory, so if we try to allocate a new
- // pointer array immediately after the spill then we may fail to acquire sufficient space
- // for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after*
- // we have actually freed memory from our map.
- sorter.spill(false /* initialize for writing */);
+ sorter.spill();
map.free();
- sorter.initializeForWriting();
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index ada4d42f99..1739798a24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -200,9 +200,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()
withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- // 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter.
- assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
- initialMemoryConsumption + 4096 * 16)
+ assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}
// Add more keys to the sorter and make sure the results come out sorted.
@@ -305,9 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()
withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- // 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter.
- assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
- initialMemoryConsumption + 4096 * 16)
+ assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}
// Add more keys to the sorter and make sure the results come out sorted.