diff options
3 files changed, 15 insertions, 244 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index e36709c6fc..07241c827c 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -35,7 +35,6 @@ import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.bitset.BitSet; import org.apache.spark.unsafe.hash.Murmur3_x86_32; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.MemoryLocation; @@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer { */ private boolean canGrowArray = true; - /** - * A {@link BitSet} used to track location of the map where the key is set. - * Size of the bitset should be half of the size of the long array. - */ - @Nullable private BitSet bitset; - private final double loadFactor; /** @@ -427,7 +420,6 @@ public final class BytesToBytesMap extends MemoryConsumer { * This is a thread-safe version of `lookup`, could be used by multiple threads. */ public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) { - assert(bitset != null); assert(longArray != null); if (enablePerfMetrics) { @@ -440,7 +432,7 @@ public final class BytesToBytesMap extends MemoryConsumer { if (enablePerfMetrics) { numProbes++; } - if (!bitset.isSet(pos)) { + if (longArray.get(pos * 2) == 0) { // This is a new key. loc.with(pos, hashcode, false); return; @@ -644,7 +636,6 @@ public final class BytesToBytesMap extends MemoryConsumer { assert (!isDefined) : "Can only set value once for a key"; assert (keyLength % 8 == 0); assert (valueLength % 8 == 0); - assert(bitset != null); assert(longArray != null); if (numElements == MAX_CAPACITY || !canGrowArray) { @@ -678,7 +669,6 @@ public final class BytesToBytesMap extends MemoryConsumer { Platform.putInt(base, offset, Platform.getInt(base, offset) + 1); pageCursor += recordLength; numElements++; - bitset.set(pos); final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset( currentPage, recordOffset); longArray.set(pos * 2, storedKeyAddress); @@ -734,7 +724,6 @@ public final class BytesToBytesMap extends MemoryConsumer { assert (capacity <= MAX_CAPACITY); acquireMemory(capacity * 16); longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2])); - bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64])); this.growthThreshold = (int) (capacity * loadFactor); this.mask = capacity - 1; @@ -749,7 +738,6 @@ public final class BytesToBytesMap extends MemoryConsumer { long used = longArray.memoryBlock().size(); longArray = null; releaseMemory(used); - bitset = null; } } @@ -795,9 +783,7 @@ public final class BytesToBytesMap extends MemoryConsumer { for (MemoryBlock dataPage : dataPages) { totalDataPagesSize += dataPage.size(); } - return totalDataPagesSize + - ((bitset != null) ? bitset.memoryBlock().size() : 0L) + - ((longArray != null) ? longArray.memoryBlock().size() : 0L); + return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L); } private void updatePeakMemoryUsed() { @@ -852,7 +838,6 @@ public final class BytesToBytesMap extends MemoryConsumer { */ @VisibleForTesting void growAndRehash() { - assert(bitset != null); assert(longArray != null); long resizeStartTime = -1; @@ -861,39 +846,26 @@ public final class BytesToBytesMap extends MemoryConsumer { } // Store references to the old data structures to be used when we re-hash final LongArray oldLongArray = longArray; - final BitSet oldBitSet = bitset; - final int oldCapacity = (int) oldBitSet.capacity(); + final int oldCapacity = (int) oldLongArray.size() / 2; // Allocate the new data structures - try { - allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY)); - } catch (OutOfMemoryError oom) { - longArray = oldLongArray; - bitset = oldBitSet; - throw oom; - } + allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY)); // Re-mask (we don't recompute the hashcode because we stored all 32 bits of it) - for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) { - final long keyPointer = oldLongArray.get(pos * 2); - final int hashcode = (int) oldLongArray.get(pos * 2 + 1); + for (int i = 0; i < oldLongArray.size(); i += 2) { + final long keyPointer = oldLongArray.get(i); + if (keyPointer == 0) { + continue; + } + final int hashcode = (int) oldLongArray.get(i + 1); int newPos = hashcode & mask; int step = 1; - boolean keepGoing = true; - - // No need to check for equality here when we insert so this has one less if branch than - // the similar code path in addWithoutResize. - while (keepGoing) { - if (!bitset.isSet(newPos)) { - bitset.set(newPos); - longArray.set(newPos * 2, keyPointer); - longArray.set(newPos * 2 + 1, hashcode); - keepGoing = false; - } else { - newPos = (newPos + step) & mask; - step++; - } + while (longArray.get(newPos * 2) != 0) { + newPos = (newPos + step) & mask; + step++; } + longArray.set(newPos * 2, keyPointer); + longArray.set(newPos * 2 + 1, hashcode); } releaseMemory(oldLongArray.memoryBlock().size()); diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java b/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java deleted file mode 100644 index 7c124173b0..0000000000 --- a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.bitset; - -import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.memory.MemoryBlock; - -/** - * A fixed size uncompressed bit set backed by a {@link LongArray}. - * - * Each bit occupies exactly one bit of storage. - */ -public final class BitSet { - - /** A long array for the bits. */ - private final LongArray words; - - /** Length of the long array. */ - private final int numWords; - - private final Object baseObject; - private final long baseOffset; - - /** - * Creates a new {@link BitSet} using the specified memory block. Size of the memory block must be - * multiple of 8 bytes (i.e. 64 bits). - */ - public BitSet(MemoryBlock memory) { - words = new LongArray(memory); - assert (words.size() <= Integer.MAX_VALUE); - numWords = (int) words.size(); - baseObject = words.memoryBlock().getBaseObject(); - baseOffset = words.memoryBlock().getBaseOffset(); - } - - public MemoryBlock memoryBlock() { - return words.memoryBlock(); - } - - /** - * Returns the number of bits in this {@code BitSet}. - */ - public long capacity() { - return numWords * 64; - } - - /** - * Sets the bit at the specified index to {@code true}. - */ - public void set(int index) { - assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")"; - BitSetMethods.set(baseObject, baseOffset, index); - } - - /** - * Sets the bit at the specified index to {@code false}. - */ - public void unset(int index) { - assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")"; - BitSetMethods.unset(baseObject, baseOffset, index); - } - - /** - * Returns {@code true} if the bit is set at the specified index. - */ - public boolean isSet(int index) { - assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")"; - return BitSetMethods.isSet(baseObject, baseOffset, index); - } - - /** - * Returns the index of the first bit that is set to true that occurs on or after the - * specified starting index. If no such bit exists then {@code -1} is returned. - * <p> - * To iterate over the true bits in a BitSet, use the following loop: - * <pre> - * <code> - * for (long i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i + 1)) { - * // operate on index i here - * } - * </code> - * </pre> - * - * @param fromIndex the index to start checking from (inclusive) - * @return the index of the next set bit, or -1 if there is no such bit - */ - public int nextSetBit(int fromIndex) { - return BitSetMethods.nextSetBit(baseObject, baseOffset, fromIndex, numWords); - } - - /** - * Returns {@code true} if any bit is set. - */ - public boolean anySet() { - return BitSetMethods.anySet(baseObject, baseOffset, numWords); - } - -} diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java deleted file mode 100644 index 14e38683df..0000000000 --- a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.bitset; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.spark.unsafe.memory.MemoryBlock; - -public class BitSetSuite { - - private static BitSet createBitSet(int capacity) { - Assert.assertEquals(0, capacity % 64); - return new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64])); - } - - @Test - public void basicOps() { - BitSet bs = createBitSet(64); - Assert.assertEquals(64, bs.capacity()); - - // Make sure the bit set starts empty. - for (int i = 0; i < bs.capacity(); i++) { - Assert.assertFalse(bs.isSet(i)); - } - // another form of asserting that the bit set is empty - Assert.assertFalse(bs.anySet()); - - // Set every bit and check it. - for (int i = 0; i < bs.capacity(); i++) { - bs.set(i); - Assert.assertTrue(bs.isSet(i)); - } - - // Unset every bit and check it. - for (int i = 0; i < bs.capacity(); i++) { - Assert.assertTrue(bs.isSet(i)); - bs.unset(i); - Assert.assertFalse(bs.isSet(i)); - } - - // Make sure anySet() can detect any set bit - bs = createBitSet(256); - bs.set(64); - Assert.assertTrue(bs.anySet()); - } - - @Test - public void traversal() { - BitSet bs = createBitSet(256); - - Assert.assertEquals(-1, bs.nextSetBit(0)); - Assert.assertEquals(-1, bs.nextSetBit(10)); - Assert.assertEquals(-1, bs.nextSetBit(64)); - - bs.set(10); - Assert.assertEquals(10, bs.nextSetBit(0)); - Assert.assertEquals(10, bs.nextSetBit(1)); - Assert.assertEquals(10, bs.nextSetBit(10)); - Assert.assertEquals(-1, bs.nextSetBit(11)); - - bs.set(11); - Assert.assertEquals(10, bs.nextSetBit(10)); - Assert.assertEquals(11, bs.nextSetBit(11)); - - // Skip a whole word and find it - bs.set(190); - Assert.assertEquals(190, bs.nextSetBit(12)); - - Assert.assertEquals(-1, bs.nextSetBit(191)); - Assert.assertEquals(-1, bs.nextSetBit(256)); - } -} |