aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java58
1 files changed, 15 insertions, 43 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index e36709c6fc..07241c827c 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -35,7 +35,6 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.bitset.BitSet;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
@@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
private boolean canGrowArray = true;
- /**
- * A {@link BitSet} used to track location of the map where the key is set.
- * Size of the bitset should be half of the size of the long array.
- */
- @Nullable private BitSet bitset;
-
private final double loadFactor;
/**
@@ -427,7 +420,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
* This is a thread-safe version of `lookup`, could be used by multiple threads.
*/
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) {
- assert(bitset != null);
assert(longArray != null);
if (enablePerfMetrics) {
@@ -440,7 +432,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (enablePerfMetrics) {
numProbes++;
}
- if (!bitset.isSet(pos)) {
+ if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hashcode, false);
return;
@@ -644,7 +636,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (!isDefined) : "Can only set value once for a key";
assert (keyLength % 8 == 0);
assert (valueLength % 8 == 0);
- assert(bitset != null);
assert(longArray != null);
if (numElements == MAX_CAPACITY || !canGrowArray) {
@@ -678,7 +669,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
numElements++;
- bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
@@ -734,7 +724,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (capacity <= MAX_CAPACITY);
acquireMemory(capacity * 16);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
- bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
this.growthThreshold = (int) (capacity * loadFactor);
this.mask = capacity - 1;
@@ -749,7 +738,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
- bitset = null;
}
}
@@ -795,9 +783,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
- return totalDataPagesSize +
- ((bitset != null) ? bitset.memoryBlock().size() : 0L) +
- ((longArray != null) ? longArray.memoryBlock().size() : 0L);
+ return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
}
private void updatePeakMemoryUsed() {
@@ -852,7 +838,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
@VisibleForTesting
void growAndRehash() {
- assert(bitset != null);
assert(longArray != null);
long resizeStartTime = -1;
@@ -861,39 +846,26 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
// Store references to the old data structures to be used when we re-hash
final LongArray oldLongArray = longArray;
- final BitSet oldBitSet = bitset;
- final int oldCapacity = (int) oldBitSet.capacity();
+ final int oldCapacity = (int) oldLongArray.size() / 2;
// Allocate the new data structures
- try {
- allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
- } catch (OutOfMemoryError oom) {
- longArray = oldLongArray;
- bitset = oldBitSet;
- throw oom;
- }
+ allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
- for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {
- final long keyPointer = oldLongArray.get(pos * 2);
- final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
+ for (int i = 0; i < oldLongArray.size(); i += 2) {
+ final long keyPointer = oldLongArray.get(i);
+ if (keyPointer == 0) {
+ continue;
+ }
+ final int hashcode = (int) oldLongArray.get(i + 1);
int newPos = hashcode & mask;
int step = 1;
- boolean keepGoing = true;
-
- // No need to check for equality here when we insert so this has one less if branch than
- // the similar code path in addWithoutResize.
- while (keepGoing) {
- if (!bitset.isSet(newPos)) {
- bitset.set(newPos);
- longArray.set(newPos * 2, keyPointer);
- longArray.set(newPos * 2 + 1, hashcode);
- keepGoing = false;
- } else {
- newPos = (newPos + step) & mask;
- step++;
- }
+ while (longArray.get(newPos * 2) != 0) {
+ newPos = (newPos + step) & mask;
+ step++;
}
+ longArray.set(newPos * 2, keyPointer);
+ longArray.set(newPos * 2 + 1, hashcode);
}
releaseMemory(oldLongArray.memoryBlock().size());