aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java58
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java113
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java88
3 files changed, 15 insertions, 244 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index e36709c6fc..07241c827c 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -35,7 +35,6 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.bitset.BitSet;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
@@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
private boolean canGrowArray = true;
- /**
- * A {@link BitSet} used to track location of the map where the key is set.
- * Size of the bitset should be half of the size of the long array.
- */
- @Nullable private BitSet bitset;
-
private final double loadFactor;
/**
@@ -427,7 +420,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
* This is a thread-safe version of `lookup`, could be used by multiple threads.
*/
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) {
- assert(bitset != null);
assert(longArray != null);
if (enablePerfMetrics) {
@@ -440,7 +432,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (enablePerfMetrics) {
numProbes++;
}
- if (!bitset.isSet(pos)) {
+ if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hashcode, false);
return;
@@ -644,7 +636,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (!isDefined) : "Can only set value once for a key";
assert (keyLength % 8 == 0);
assert (valueLength % 8 == 0);
- assert(bitset != null);
assert(longArray != null);
if (numElements == MAX_CAPACITY || !canGrowArray) {
@@ -678,7 +669,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
numElements++;
- bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
@@ -734,7 +724,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (capacity <= MAX_CAPACITY);
acquireMemory(capacity * 16);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
- bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
this.growthThreshold = (int) (capacity * loadFactor);
this.mask = capacity - 1;
@@ -749,7 +738,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
- bitset = null;
}
}
@@ -795,9 +783,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
- return totalDataPagesSize +
- ((bitset != null) ? bitset.memoryBlock().size() : 0L) +
- ((longArray != null) ? longArray.memoryBlock().size() : 0L);
+ return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
}
private void updatePeakMemoryUsed() {
@@ -852,7 +838,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
@VisibleForTesting
void growAndRehash() {
- assert(bitset != null);
assert(longArray != null);
long resizeStartTime = -1;
@@ -861,39 +846,26 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
// Store references to the old data structures to be used when we re-hash
final LongArray oldLongArray = longArray;
- final BitSet oldBitSet = bitset;
- final int oldCapacity = (int) oldBitSet.capacity();
+ final int oldCapacity = (int) oldLongArray.size() / 2;
// Allocate the new data structures
- try {
- allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
- } catch (OutOfMemoryError oom) {
- longArray = oldLongArray;
- bitset = oldBitSet;
- throw oom;
- }
+ allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
- for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {
- final long keyPointer = oldLongArray.get(pos * 2);
- final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
+ for (int i = 0; i < oldLongArray.size(); i += 2) {
+ final long keyPointer = oldLongArray.get(i);
+ if (keyPointer == 0) {
+ continue;
+ }
+ final int hashcode = (int) oldLongArray.get(i + 1);
int newPos = hashcode & mask;
int step = 1;
- boolean keepGoing = true;
-
- // No need to check for equality here when we insert so this has one less if branch than
- // the similar code path in addWithoutResize.
- while (keepGoing) {
- if (!bitset.isSet(newPos)) {
- bitset.set(newPos);
- longArray.set(newPos * 2, keyPointer);
- longArray.set(newPos * 2 + 1, hashcode);
- keepGoing = false;
- } else {
- newPos = (newPos + step) & mask;
- step++;
- }
+ while (longArray.get(newPos * 2) != 0) {
+ newPos = (newPos + step) & mask;
+ step++;
}
+ longArray.set(newPos * 2, keyPointer);
+ longArray.set(newPos * 2 + 1, hashcode);
}
releaseMemory(oldLongArray.memoryBlock().size());
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java b/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java
deleted file mode 100644
index 7c124173b0..0000000000
--- a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.bitset;
-
-import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-
-/**
- * A fixed size uncompressed bit set backed by a {@link LongArray}.
- *
- * Each bit occupies exactly one bit of storage.
- */
-public final class BitSet {
-
- /** A long array for the bits. */
- private final LongArray words;
-
- /** Length of the long array. */
- private final int numWords;
-
- private final Object baseObject;
- private final long baseOffset;
-
- /**
- * Creates a new {@link BitSet} using the specified memory block. Size of the memory block must be
- * multiple of 8 bytes (i.e. 64 bits).
- */
- public BitSet(MemoryBlock memory) {
- words = new LongArray(memory);
- assert (words.size() <= Integer.MAX_VALUE);
- numWords = (int) words.size();
- baseObject = words.memoryBlock().getBaseObject();
- baseOffset = words.memoryBlock().getBaseOffset();
- }
-
- public MemoryBlock memoryBlock() {
- return words.memoryBlock();
- }
-
- /**
- * Returns the number of bits in this {@code BitSet}.
- */
- public long capacity() {
- return numWords * 64;
- }
-
- /**
- * Sets the bit at the specified index to {@code true}.
- */
- public void set(int index) {
- assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")";
- BitSetMethods.set(baseObject, baseOffset, index);
- }
-
- /**
- * Sets the bit at the specified index to {@code false}.
- */
- public void unset(int index) {
- assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")";
- BitSetMethods.unset(baseObject, baseOffset, index);
- }
-
- /**
- * Returns {@code true} if the bit is set at the specified index.
- */
- public boolean isSet(int index) {
- assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")";
- return BitSetMethods.isSet(baseObject, baseOffset, index);
- }
-
- /**
- * Returns the index of the first bit that is set to true that occurs on or after the
- * specified starting index. If no such bit exists then {@code -1} is returned.
- * <p>
- * To iterate over the true bits in a BitSet, use the following loop:
- * <pre>
- * <code>
- * for (long i = bs.nextSetBit(0); i &gt;= 0; i = bs.nextSetBit(i + 1)) {
- * // operate on index i here
- * }
- * </code>
- * </pre>
- *
- * @param fromIndex the index to start checking from (inclusive)
- * @return the index of the next set bit, or -1 if there is no such bit
- */
- public int nextSetBit(int fromIndex) {
- return BitSetMethods.nextSetBit(baseObject, baseOffset, fromIndex, numWords);
- }
-
- /**
- * Returns {@code true} if any bit is set.
- */
- public boolean anySet() {
- return BitSetMethods.anySet(baseObject, baseOffset, numWords);
- }
-
-}
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
deleted file mode 100644
index 14e38683df..0000000000
--- a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.bitset;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.unsafe.memory.MemoryBlock;
-
-public class BitSetSuite {
-
- private static BitSet createBitSet(int capacity) {
- Assert.assertEquals(0, capacity % 64);
- return new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
- }
-
- @Test
- public void basicOps() {
- BitSet bs = createBitSet(64);
- Assert.assertEquals(64, bs.capacity());
-
- // Make sure the bit set starts empty.
- for (int i = 0; i < bs.capacity(); i++) {
- Assert.assertFalse(bs.isSet(i));
- }
- // another form of asserting that the bit set is empty
- Assert.assertFalse(bs.anySet());
-
- // Set every bit and check it.
- for (int i = 0; i < bs.capacity(); i++) {
- bs.set(i);
- Assert.assertTrue(bs.isSet(i));
- }
-
- // Unset every bit and check it.
- for (int i = 0; i < bs.capacity(); i++) {
- Assert.assertTrue(bs.isSet(i));
- bs.unset(i);
- Assert.assertFalse(bs.isSet(i));
- }
-
- // Make sure anySet() can detect any set bit
- bs = createBitSet(256);
- bs.set(64);
- Assert.assertTrue(bs.anySet());
- }
-
- @Test
- public void traversal() {
- BitSet bs = createBitSet(256);
-
- Assert.assertEquals(-1, bs.nextSetBit(0));
- Assert.assertEquals(-1, bs.nextSetBit(10));
- Assert.assertEquals(-1, bs.nextSetBit(64));
-
- bs.set(10);
- Assert.assertEquals(10, bs.nextSetBit(0));
- Assert.assertEquals(10, bs.nextSetBit(1));
- Assert.assertEquals(10, bs.nextSetBit(10));
- Assert.assertEquals(-1, bs.nextSetBit(11));
-
- bs.set(11);
- Assert.assertEquals(10, bs.nextSetBit(10));
- Assert.assertEquals(11, bs.nextSetBit(11));
-
- // Skip a whole word and find it
- bs.set(190);
- Assert.assertEquals(190, bs.nextSetBit(12));
-
- Assert.assertEquals(-1, bs.nextSetBit(191));
- Assert.assertEquals(-1, bs.nextSetBit(256));
- }
-}