diff options
author | Yin Huai <yhuai@databricks.com> | 2015-08-05 19:19:09 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-08-05 19:19:09 -0700 |
commit | 4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3 (patch) | |
tree | 7a67455f97f5915aa1f47a0efadd33f812dede1e /core | |
parent | 4399b7b0903d830313ab7e69731c11d587ae567c (diff) | |
download | spark-4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3.tar.gz spark-4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3.tar.bz2 spark-4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3.zip |
[SPARK-9611] [SQL] Fixes a few corner cases when we spill a UnsafeFixedWidthAggregationMap
This PR has the following three small fixes.
1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty.
2. We will not not spill a InMemorySorter if it is empty.
3. We will not add a SpillReader to a SpillMerger if this SpillReader is empty.
JIRA: https://issues.apache.org/jira/browse/SPARK-9611
Author: Yin Huai <yhuai@databricks.com>
Closes #7948 from yhuai/unsafeEmptyMap and squashes the following commits:
9727abe [Yin Huai] Address Josh's comments.
34b6f76 [Yin Huai] 1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty. 2. Do not spill a InMemorySorter if it is empty. 3. Do not add spill to SpillMerger if this spill is empty.
Diffstat (limited to 'core')
2 files changed, 30 insertions, 18 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index e6ddd08e5f..8f78fc5a41 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -191,24 +191,29 @@ public final class UnsafeExternalSorter { spillWriters.size(), spillWriters.size() > 1 ? " times" : " time"); - final UnsafeSorterSpillWriter spillWriter = - new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, - inMemSorter.numRecords()); - spillWriters.add(spillWriter); - final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator(); - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final Object baseObject = sortedRecords.getBaseObject(); - final long baseOffset = sortedRecords.getBaseOffset(); - final int recordLength = sortedRecords.getRecordLength(); - spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); + // We only write out contents of the inMemSorter if it is not empty. + if (inMemSorter.numRecords() > 0) { + final UnsafeSorterSpillWriter spillWriter = + new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, + inMemSorter.numRecords()); + spillWriters.add(spillWriter); + final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator(); + while (sortedRecords.hasNext()) { + sortedRecords.loadNext(); + final Object baseObject = sortedRecords.getBaseObject(); + final long baseOffset = sortedRecords.getBaseOffset(); + final int recordLength = sortedRecords.getRecordLength(); + spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); + } + spillWriter.close(); } - spillWriter.close(); + final long spillSize = freeMemory(); // Note that this is more-or-less going to be a multiple of the page size, so wasted space in // pages will currently be counted as memory spilled even though that space isn't actually // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + initializeForWriting(); } @@ -505,12 +510,11 @@ public final class UnsafeExternalSorter { final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { - spillMerger.addSpill(spillWriter.getReader(blockManager)); + spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager)); } spillWriters.clear(); - if (inMemoryIterator.hasNext()) { - spillMerger.addSpill(inMemoryIterator); - } + spillMerger.addSpillIfNotEmpty(inMemoryIterator); + return spillMerger.getSortedIterator(); } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index 8272c2a5be..3874a9f9cb 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -47,11 +47,19 @@ final class UnsafeSorterSpillMerger { priorityQueue = new PriorityQueue<UnsafeSorterIterator>(numSpills, comparator); } - public void addSpill(UnsafeSorterIterator spillReader) throws IOException { + /** + * Add an UnsafeSorterIterator to this merger + */ + public void addSpillIfNotEmpty(UnsafeSorterIterator spillReader) throws IOException { if (spillReader.hasNext()) { + // We only add the spillReader to the priorityQueue if it is not empty. We do this to + // make sure the hasNext method of UnsafeSorterIterator returned by getSortedIterator + // does not return wrong result because hasNext will returns true + // at least priorityQueue.size() times. If we allow n spillReaders in the + // priorityQueue, we will have n extra empty records in the result of the UnsafeSorterIterator. spillReader.loadNext(); + priorityQueue.add(spillReader); } - priorityQueue.add(spillReader); } public UnsafeSorterIterator getSortedIterator() throws IOException { |