diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 58 |
1 files changed, 15 insertions, 43 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index e36709c6fc..07241c827c 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -35,7 +35,6 @@ import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.bitset.BitSet; import org.apache.spark.unsafe.hash.Murmur3_x86_32; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.MemoryLocation; @@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer { */ private boolean canGrowArray = true; - /** - * A {@link BitSet} used to track location of the map where the key is set. - * Size of the bitset should be half of the size of the long array. - */ - @Nullable private BitSet bitset; - private final double loadFactor; /** @@ -427,7 +420,6 @@ public final class BytesToBytesMap extends MemoryConsumer { * This is a thread-safe version of `lookup`, could be used by multiple threads. */ public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) { - assert(bitset != null); assert(longArray != null); if (enablePerfMetrics) { @@ -440,7 +432,7 @@ public final class BytesToBytesMap extends MemoryConsumer { if (enablePerfMetrics) { numProbes++; } - if (!bitset.isSet(pos)) { + if (longArray.get(pos * 2) == 0) { // This is a new key. loc.with(pos, hashcode, false); return; @@ -644,7 +636,6 @@ public final class BytesToBytesMap extends MemoryConsumer { assert (!isDefined) : "Can only set value once for a key"; assert (keyLength % 8 == 0); assert (valueLength % 8 == 0); - assert(bitset != null); assert(longArray != null); if (numElements == MAX_CAPACITY || !canGrowArray) { @@ -678,7 +669,6 @@ public final class BytesToBytesMap extends MemoryConsumer { Platform.putInt(base, offset, Platform.getInt(base, offset) + 1); pageCursor += recordLength; numElements++; - bitset.set(pos); final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset( currentPage, recordOffset); longArray.set(pos * 2, storedKeyAddress); @@ -734,7 +724,6 @@ public final class BytesToBytesMap extends MemoryConsumer { assert (capacity <= MAX_CAPACITY); acquireMemory(capacity * 16); longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2])); - bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64])); this.growthThreshold = (int) (capacity * loadFactor); this.mask = capacity - 1; @@ -749,7 +738,6 @@ public final class BytesToBytesMap extends MemoryConsumer { long used = longArray.memoryBlock().size(); longArray = null; releaseMemory(used); - bitset = null; } } @@ -795,9 +783,7 @@ public final class BytesToBytesMap extends MemoryConsumer { for (MemoryBlock dataPage : dataPages) { totalDataPagesSize += dataPage.size(); } - return totalDataPagesSize + - ((bitset != null) ? bitset.memoryBlock().size() : 0L) + - ((longArray != null) ? longArray.memoryBlock().size() : 0L); + return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L); } private void updatePeakMemoryUsed() { @@ -852,7 +838,6 @@ public final class BytesToBytesMap extends MemoryConsumer { */ @VisibleForTesting void growAndRehash() { - assert(bitset != null); assert(longArray != null); long resizeStartTime = -1; @@ -861,39 +846,26 @@ public final class BytesToBytesMap extends MemoryConsumer { } // Store references to the old data structures to be used when we re-hash final LongArray oldLongArray = longArray; - final BitSet oldBitSet = bitset; - final int oldCapacity = (int) oldBitSet.capacity(); + final int oldCapacity = (int) oldLongArray.size() / 2; // Allocate the new data structures - try { - allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY)); - } catch (OutOfMemoryError oom) { - longArray = oldLongArray; - bitset = oldBitSet; - throw oom; - } + allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY)); // Re-mask (we don't recompute the hashcode because we stored all 32 bits of it) - for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) { - final long keyPointer = oldLongArray.get(pos * 2); - final int hashcode = (int) oldLongArray.get(pos * 2 + 1); + for (int i = 0; i < oldLongArray.size(); i += 2) { + final long keyPointer = oldLongArray.get(i); + if (keyPointer == 0) { + continue; + } + final int hashcode = (int) oldLongArray.get(i + 1); int newPos = hashcode & mask; int step = 1; - boolean keepGoing = true; - - // No need to check for equality here when we insert so this has one less if branch than - // the similar code path in addWithoutResize. - while (keepGoing) { - if (!bitset.isSet(newPos)) { - bitset.set(newPos); - longArray.set(newPos * 2, keyPointer); - longArray.set(newPos * 2 + 1, hashcode); - keepGoing = false; - } else { - newPos = (newPos + step) & mask; - step++; - } + while (longArray.get(newPos * 2) != 0) { + newPos = (newPos + step) & mask; + step++; } + longArray.set(newPos * 2, keyPointer); + longArray.set(newPos * 2 + 1, hashcode); } releaseMemory(oldLongArray.memoryBlock().size()); |