From 881548ab20fa4c4b635c51d956b14bd13981e2f4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 7 Aug 2015 14:20:13 -0700 Subject: [SPARK-9674] Re-enable ignored test in SQLQuerySuite The original code that this test tests is removed in https://github.com/apache/spark/commit/9270bd06fd0b16892e3f37213b5bc7813ea11fdd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass. JoshRosen yhuai Author: Andrew Or Closes #8015 from andrewor14/SPARK-9674 and squashes the following commits: 225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674 8c24209 [Andrew Or] Fix NPE e541d64 [Andrew Or] Track aggregation memory for both sort and hash 0be3a42 [Andrew Or] Fix test --- .../apache/spark/unsafe/map/BytesToBytesMap.java | 37 +++++++++++++++++++--- .../unsafe/map/AbstractBytesToBytesMapSuite.java | 20 ++++++++---- 2 files changed, 46 insertions(+), 11 deletions(-) (limited to 'core') diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0636ae7c8d..7f79cd13aa 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -109,7 +109,7 @@ public final class BytesToBytesMap { * Position {@code 2 * i} in the array is used to track a pointer to the key at index {@code i}, * while position {@code 2 * i + 1} in the array holds key's full 32-bit hashcode. */ - private LongArray longArray; + @Nullable private LongArray longArray; // TODO: we're wasting 32 bits of space here; we can probably store fewer bits of the hashcode // and exploit word-alignment to use fewer bits to hold the address. This might let us store // only one long per map entry, increasing the chance that this array will fit in cache at the @@ -124,7 +124,7 @@ public final class BytesToBytesMap { * A {@link BitSet} used to track location of the map where the key is set. * Size of the bitset should be half of the size of the long array. */ - private BitSet bitset; + @Nullable private BitSet bitset; private final double loadFactor; @@ -166,6 +166,8 @@ public final class BytesToBytesMap { private long numHashCollisions = 0; + private long peakMemoryUsedBytes = 0L; + public BytesToBytesMap( TaskMemoryManager taskMemoryManager, ShuffleMemoryManager shuffleMemoryManager, @@ -321,6 +323,9 @@ public final class BytesToBytesMap { Object keyBaseObject, long keyBaseOffset, int keyRowLengthBytes) { + assert(bitset != null); + assert(longArray != null); + if (enablePerfMetrics) { numKeyLookups++; } @@ -410,6 +415,7 @@ public final class BytesToBytesMap { } private Location with(int pos, int keyHashcode, boolean isDefined) { + assert(longArray != null); this.pos = pos; this.isDefined = isDefined; this.keyHashcode = keyHashcode; @@ -525,6 +531,9 @@ public final class BytesToBytesMap { assert (!isDefined) : "Can only set value once for a key"; assert (keyLengthBytes % 8 == 0); assert (valueLengthBytes % 8 == 0); + assert(bitset != null); + assert(longArray != null); + if (numElements == MAX_CAPACITY) { throw new IllegalStateException("BytesToBytesMap has reached maximum capacity"); } @@ -658,6 +667,7 @@ public final class BytesToBytesMap { * This method is idempotent and can be called multiple times. */ public void free() { + updatePeakMemoryUsed(); longArray = null; bitset = null; Iterator dataPagesIterator = dataPages.iterator(); @@ -684,14 +694,30 @@ public final class BytesToBytesMap { /** * Returns the total amount of memory, in bytes, consumed by this map's managed structures. - * Note that this is also the peak memory used by this map, since the map is append-only. */ public long getTotalMemoryConsumption() { long totalDataPagesSize = 0L; for (MemoryBlock dataPage : dataPages) { totalDataPagesSize += dataPage.size(); } - return totalDataPagesSize + bitset.memoryBlock().size() + longArray.memoryBlock().size(); + return totalDataPagesSize + + ((bitset != null) ? bitset.memoryBlock().size() : 0L) + + ((longArray != null) ? longArray.memoryBlock().size() : 0L); + } + + private void updatePeakMemoryUsed() { + long mem = getTotalMemoryConsumption(); + if (mem > peakMemoryUsedBytes) { + peakMemoryUsedBytes = mem; + } + } + + /** + * Return the peak memory used so far, in bytes. + */ + public long getPeakMemoryUsedBytes() { + updatePeakMemoryUsed(); + return peakMemoryUsedBytes; } /** @@ -731,6 +757,9 @@ public final class BytesToBytesMap { */ @VisibleForTesting void growAndRehash() { + assert(bitset != null); + assert(longArray != null); + long resizeStartTime = -1; if (enablePerfMetrics) { resizeStartTime = System.nanoTime(); diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 0b11562980..e56a3f0b6d 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -525,7 +525,7 @@ public abstract class AbstractBytesToBytesMapSuite { } @Test - public void testTotalMemoryConsumption() { + public void testPeakMemoryUsed() { final long recordLengthBytes = 24; final long pageSizeBytes = 256 + 8; // 8 bytes for end-of-page marker final long numRecordsPerPage = (pageSizeBytes - 8) / recordLengthBytes; @@ -536,8 +536,8 @@ public abstract class AbstractBytesToBytesMapSuite { // monotonically increasing. More specifically, every time we allocate a new page it // should increase by exactly the size of the page. In this regard, the memory usage // at any given time is also the peak memory used. - long previousMemory = map.getTotalMemoryConsumption(); - long newMemory; + long previousPeakMemory = map.getPeakMemoryUsedBytes(); + long newPeakMemory; try { for (long i = 0; i < numRecordsPerPage * 10; i++) { final long[] value = new long[]{i}; @@ -548,15 +548,21 @@ public abstract class AbstractBytesToBytesMapSuite { value, PlatformDependent.LONG_ARRAY_OFFSET, 8); - newMemory = map.getTotalMemoryConsumption(); + newPeakMemory = map.getPeakMemoryUsedBytes(); if (i % numRecordsPerPage == 0) { // We allocated a new page for this record, so peak memory should change - assertEquals(previousMemory + pageSizeBytes, newMemory); + assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory); } else { - assertEquals(previousMemory, newMemory); + assertEquals(previousPeakMemory, newPeakMemory); } - previousMemory = newMemory; + previousPeakMemory = newPeakMemory; } + + // Freeing the map should not change the peak memory + map.free(); + newPeakMemory = map.getPeakMemoryUsedBytes(); + assertEquals(previousPeakMemory, newPeakMemory); + } finally { map.free(); } -- cgit v1.2.3