aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-07 14:20:13 -0700
committerYin Huai <yhuai@databricks.com>2015-08-07 14:20:13 -0700
commit881548ab20fa4c4b635c51d956b14bd13981e2f4 (patch)
treec29a335f8f90d664a77bab926d866468922c762b /core
parent05d04e10a8ea030bea840c3c5ba93ecac479a039 (diff)
downloadspark-881548ab20fa4c4b635c51d956b14bd13981e2f4.tar.gz
spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.tar.bz2
spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.zip
[SPARK-9674] Re-enable ignored test in SQLQuerySuite
The original code that this test tests is removed in https://github.com/apache/spark/commit/9270bd06fd0b16892e3f37213b5bc7813ea11fdd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass. JoshRosen yhuai Author: Andrew Or <andrew@databricks.com> Closes #8015 from andrewor14/SPARK-9674 and squashes the following commits: 225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674 8c24209 [Andrew Or] Fix NPE e541d64 [Andrew Or] Track aggregation memory for both sort and hash 0be3a42 [Andrew Or] Fix test
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java37
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java20
2 files changed, 46 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 0636ae7c8d..7f79cd13aa 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -109,7 +109,7 @@ public final class BytesToBytesMap {
* Position {@code 2 * i} in the array is used to track a pointer to the key at index {@code i},
* while position {@code 2 * i + 1} in the array holds key's full 32-bit hashcode.
*/
- private LongArray longArray;
+ @Nullable private LongArray longArray;
// TODO: we're wasting 32 bits of space here; we can probably store fewer bits of the hashcode
// and exploit word-alignment to use fewer bits to hold the address. This might let us store
// only one long per map entry, increasing the chance that this array will fit in cache at the
@@ -124,7 +124,7 @@ public final class BytesToBytesMap {
* A {@link BitSet} used to track location of the map where the key is set.
* Size of the bitset should be half of the size of the long array.
*/
- private BitSet bitset;
+ @Nullable private BitSet bitset;
private final double loadFactor;
@@ -166,6 +166,8 @@ public final class BytesToBytesMap {
private long numHashCollisions = 0;
+ private long peakMemoryUsedBytes = 0L;
+
public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
ShuffleMemoryManager shuffleMemoryManager,
@@ -321,6 +323,9 @@ public final class BytesToBytesMap {
Object keyBaseObject,
long keyBaseOffset,
int keyRowLengthBytes) {
+ assert(bitset != null);
+ assert(longArray != null);
+
if (enablePerfMetrics) {
numKeyLookups++;
}
@@ -410,6 +415,7 @@ public final class BytesToBytesMap {
}
private Location with(int pos, int keyHashcode, boolean isDefined) {
+ assert(longArray != null);
this.pos = pos;
this.isDefined = isDefined;
this.keyHashcode = keyHashcode;
@@ -525,6 +531,9 @@ public final class BytesToBytesMap {
assert (!isDefined) : "Can only set value once for a key";
assert (keyLengthBytes % 8 == 0);
assert (valueLengthBytes % 8 == 0);
+ assert(bitset != null);
+ assert(longArray != null);
+
if (numElements == MAX_CAPACITY) {
throw new IllegalStateException("BytesToBytesMap has reached maximum capacity");
}
@@ -658,6 +667,7 @@ public final class BytesToBytesMap {
* This method is idempotent and can be called multiple times.
*/
public void free() {
+ updatePeakMemoryUsed();
longArray = null;
bitset = null;
Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
@@ -684,14 +694,30 @@ public final class BytesToBytesMap {
/**
* Returns the total amount of memory, in bytes, consumed by this map's managed structures.
- * Note that this is also the peak memory used by this map, since the map is append-only.
*/
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
- return totalDataPagesSize + bitset.memoryBlock().size() + longArray.memoryBlock().size();
+ return totalDataPagesSize +
+ ((bitset != null) ? bitset.memoryBlock().size() : 0L) +
+ ((longArray != null) ? longArray.memoryBlock().size() : 0L);
+ }
+
+ private void updatePeakMemoryUsed() {
+ long mem = getTotalMemoryConsumption();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
+ }
+ }
+
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ public long getPeakMemoryUsedBytes() {
+ updatePeakMemoryUsed();
+ return peakMemoryUsedBytes;
}
/**
@@ -731,6 +757,9 @@ public final class BytesToBytesMap {
*/
@VisibleForTesting
void growAndRehash() {
+ assert(bitset != null);
+ assert(longArray != null);
+
long resizeStartTime = -1;
if (enablePerfMetrics) {
resizeStartTime = System.nanoTime();
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 0b11562980..e56a3f0b6d 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -525,7 +525,7 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
- public void testTotalMemoryConsumption() {
+ public void testPeakMemoryUsed() {
final long recordLengthBytes = 24;
final long pageSizeBytes = 256 + 8; // 8 bytes for end-of-page marker
final long numRecordsPerPage = (pageSizeBytes - 8) / recordLengthBytes;
@@ -536,8 +536,8 @@ public abstract class AbstractBytesToBytesMapSuite {
// monotonically increasing. More specifically, every time we allocate a new page it
// should increase by exactly the size of the page. In this regard, the memory usage
// at any given time is also the peak memory used.
- long previousMemory = map.getTotalMemoryConsumption();
- long newMemory;
+ long previousPeakMemory = map.getPeakMemoryUsedBytes();
+ long newPeakMemory;
try {
for (long i = 0; i < numRecordsPerPage * 10; i++) {
final long[] value = new long[]{i};
@@ -548,15 +548,21 @@ public abstract class AbstractBytesToBytesMapSuite {
value,
PlatformDependent.LONG_ARRAY_OFFSET,
8);
- newMemory = map.getTotalMemoryConsumption();
+ newPeakMemory = map.getPeakMemoryUsedBytes();
if (i % numRecordsPerPage == 0) {
// We allocated a new page for this record, so peak memory should change
- assertEquals(previousMemory + pageSizeBytes, newMemory);
+ assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
- assertEquals(previousMemory, newMemory);
+ assertEquals(previousPeakMemory, newPeakMemory);
}
- previousMemory = newMemory;
+ previousPeakMemory = newPeakMemory;
}
+
+ // Freeing the map should not change the peak memory
+ map.free();
+ newPeakMemory = map.getPeakMemoryUsedBytes();
+ assertEquals(previousPeakMemory, newPeakMemory);
+
} finally {
map.free();
}