aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-02 12:32:14 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-02 12:32:14 -0700
commit2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f (patch)
treef7458ae297d36bba1acf21fd08169defef6c2ef8 /sql/catalyst
parent66924ffa6bdb8e0df1b90b789cb7ad443377e729 (diff)
downloadspark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.tar.gz
spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.tar.bz2
spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.zip
[SPARK-9531] [SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following: 1. Creates a new external sorter UnsafeKVExternalSorter 2. Adds all the data into an in-memory sorter, sorts them 3. Spills the sorted in-memory data to disk This method can be used to fallback to sort-based aggregation when under memory pressure. The pull request also includes accounting fixes from JoshRosen. TODOs (that can be done in follow-up PRs) - [x] Address Josh's feedbacks from #7849 - [x] More documentation and test cases - [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?) - [ ] Look harder at possible memory leaks and exception handling - [ ] Randomized tester for the KV sorter as well as the aggregation map Author: Reynold Xin <rxin@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #7860 from rxin/kvsorter and squashes the following commits: 986a58c [Reynold Xin] Bug fix. 599317c [Reynold Xin] Style fix and slightly more compact code. fe7bd4e [Reynold Xin] Bug fixes. fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix 3efae38 [Reynold Xin] More fixes and documentation. 45f1b09 [Josh Rosen] Ensure that spill files are cleaned up f6a9bd3 [Reynold Xin] Josh feedback. 9be8139 [Reynold Xin] Remove testSpillFrequency. 7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter. ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite. 52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java3
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java9
2 files changed, 7 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 1b475b2492..b4fc0b7b70 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -507,7 +507,8 @@ public final class UnsafeRow extends MutableRow {
public String toString() {
StringBuilder build = new StringBuilder("[");
for (int i = 0; i < sizeInBytes; i += 8) {
- build.append(PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + i));
+ build.append(java.lang.Long.toHexString(
+ PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + i)));
build.append(',');
}
build.append(']');
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 68c49feae9..5e4c6232c9 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -59,20 +59,21 @@ final class UnsafeExternalRowSorter {
StructType schema,
Ordering<InternalRow> ordering,
PrefixComparator prefixComparator,
- PrefixComputer prefixComputer) throws IOException {
+ PrefixComputer prefixComputer,
+ long pageSizeBytes) throws IOException {
this.schema = schema;
this.prefixComputer = prefixComputer;
final SparkEnv sparkEnv = SparkEnv.get();
final TaskContext taskContext = TaskContext.get();
- sorter = new UnsafeExternalSorter(
+ sorter = UnsafeExternalSorter.create(
taskContext.taskMemoryManager(),
sparkEnv.shuffleMemoryManager(),
sparkEnv.blockManager(),
taskContext,
new RowComparator(ordering, schema.length()),
prefixComparator,
- 4096,
- sparkEnv.conf()
+ /* initialSize */ 4096,
+ pageSizeBytes
);
}