aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-08-05 19:19:09 -0700
committerYin Huai <yhuai@databricks.com>2015-08-05 19:19:09 -0700
commit4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3 (patch)
tree7a67455f97f5915aa1f47a0efadd33f812dede1e /core
parent4399b7b0903d830313ab7e69731c11d587ae567c (diff)
downloadspark-4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3.tar.gz
spark-4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3.tar.bz2
spark-4581badbc8aa7e5a37ba7f7f83cc3860240f5dd3.zip
[SPARK-9611] [SQL] Fixes a few corner cases when we spill a UnsafeFixedWidthAggregationMap
This PR has the following three small fixes. 1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty. 2. We will not not spill a InMemorySorter if it is empty. 3. We will not add a SpillReader to a SpillMerger if this SpillReader is empty. JIRA: https://issues.apache.org/jira/browse/SPARK-9611 Author: Yin Huai <yhuai@databricks.com> Closes #7948 from yhuai/unsafeEmptyMap and squashes the following commits: 9727abe [Yin Huai] Address Josh's comments. 34b6f76 [Yin Huai] 1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty. 2. Do not spill a InMemorySorter if it is empty. 3. Do not add spill to SpillMerger if this spill is empty.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java36
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java12
2 files changed, 30 insertions, 18 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index e6ddd08e5f..8f78fc5a41 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -191,24 +191,29 @@ public final class UnsafeExternalSorter {
spillWriters.size(),
spillWriters.size() > 1 ? " times" : " time");
- final UnsafeSorterSpillWriter spillWriter =
- new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
- inMemSorter.numRecords());
- spillWriters.add(spillWriter);
- final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator();
- while (sortedRecords.hasNext()) {
- sortedRecords.loadNext();
- final Object baseObject = sortedRecords.getBaseObject();
- final long baseOffset = sortedRecords.getBaseOffset();
- final int recordLength = sortedRecords.getRecordLength();
- spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
+ // We only write out contents of the inMemSorter if it is not empty.
+ if (inMemSorter.numRecords() > 0) {
+ final UnsafeSorterSpillWriter spillWriter =
+ new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
+ inMemSorter.numRecords());
+ spillWriters.add(spillWriter);
+ final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator();
+ while (sortedRecords.hasNext()) {
+ sortedRecords.loadNext();
+ final Object baseObject = sortedRecords.getBaseObject();
+ final long baseOffset = sortedRecords.getBaseOffset();
+ final int recordLength = sortedRecords.getRecordLength();
+ spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
+ }
+ spillWriter.close();
}
- spillWriter.close();
+
final long spillSize = freeMemory();
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+
initializeForWriting();
}
@@ -505,12 +510,11 @@ public final class UnsafeExternalSorter {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
- spillMerger.addSpill(spillWriter.getReader(blockManager));
+ spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
}
spillWriters.clear();
- if (inMemoryIterator.hasNext()) {
- spillMerger.addSpill(inMemoryIterator);
- }
+ spillMerger.addSpillIfNotEmpty(inMemoryIterator);
+
return spillMerger.getSortedIterator();
}
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index 8272c2a5be..3874a9f9cb 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -47,11 +47,19 @@ final class UnsafeSorterSpillMerger {
priorityQueue = new PriorityQueue<UnsafeSorterIterator>(numSpills, comparator);
}
- public void addSpill(UnsafeSorterIterator spillReader) throws IOException {
+ /**
+ * Add an UnsafeSorterIterator to this merger
+ */
+ public void addSpillIfNotEmpty(UnsafeSorterIterator spillReader) throws IOException {
if (spillReader.hasNext()) {
+ // We only add the spillReader to the priorityQueue if it is not empty. We do this to
+ // make sure the hasNext method of UnsafeSorterIterator returned by getSortedIterator
+ // does not return wrong result because hasNext will returns true
+ // at least priorityQueue.size() times. If we allow n spillReaders in the
+ // priorityQueue, we will have n extra empty records in the result of the UnsafeSorterIterator.
spillReader.loadNext();
+ priorityQueue.add(spillReader);
}
- priorityQueue.add(spillReader);
}
public UnsafeSorterIterator getSortedIterator() throws IOException {