aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2017-02-17 09:38:06 -0800
committerDavies Liu <davies.liu@gmail.com>2017-02-17 09:38:06 -0800
commit3d0c3af0a76757c20e429c38efa4f14a15c9097a (patch)
tree06849f8d9e616cbeac7ec26a043c708e385550f1 /sql
parent021062af099d06b4b0095c677b3a81d21f867a9d (diff)
downloadspark-3d0c3af0a76757c20e429c38efa4f14a15c9097a.tar.gz
spark-3d0c3af0a76757c20e429c38efa4f14a15c9097a.tar.bz2
spark-3d0c3af0a76757c20e429c38efa4f14a15c9097a.zip
[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap
## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . ## How was this patch tested? Added regression test. Author: Davies Liu <davies@databricks.com> Closes #16844 from davies/off_by_one.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala40
1 files changed, 40 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index c1555114e8..6cf18de0cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -342,4 +342,44 @@ class UnsafeFixedWidthAggregationMapSuite
}
}
+ testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") {
+ val pageSize = 4096000
+ val map = new UnsafeFixedWidthAggregationMap(
+ emptyAggregationBuffer,
+ aggBufferSchema,
+ groupKeySchema,
+ taskMemoryManager,
+ 128, // initial capacity
+ pageSize,
+ false // disable perf metrics
+ )
+
+ val rand = new Random(42)
+ for (i <- 1 to 63) {
+ val str = rand.nextString(1024)
+ val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ buf.setInt(0, str.length)
+ }
+ // Simulate running out of space
+ memoryManager.limit(0)
+ var str = rand.nextString(1024)
+ var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ assert(buf != null)
+ str = rand.nextString(1024)
+ buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ assert(buf == null)
+
+ // Convert the map into a sorter. This used to fail before the fix for SPARK-10474
+ // because we would try to acquire space for the in-memory sorter pointer array before
+ // actually releasing the pages despite having spilled all of them.
+ var sorter: UnsafeKVExternalSorter = null
+ try {
+ sorter = map.destructAndCreateExternalSorter()
+ map.free()
+ } finally {
+ if (sorter != null) {
+ sorter.cleanupResources()
+ }
+ }
+ }
}