aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-04 14:45:02 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-11-04 14:45:02 -0800
commit1b6a5d4af9691c3f7f3ebee3146dc13d12a0e047 (patch)
tree387ed802d0acb7508cbf1d6ffaf59e40b2a41cff /core
parent701fb5052080fa8c0a79ad7c1e65693ccf444787 (diff)
downloadspark-1b6a5d4af9691c3f7f3ebee3146dc13d12a0e047.tar.gz
spark-1b6a5d4af9691c3f7f3ebee3146dc13d12a0e047.tar.bz2
spark-1b6a5d4af9691c3f7f3ebee3146dc13d12a0e047.zip
[SPARK-11493] remove bitset from BytesToBytesMap
Since we have 4 bytes as number of records in the beginning of a page, the address can not be zero, so we do not need the bitset. For performance concerns, the bitset could help speed up false lookup if the slot is empty (because bitset is smaller than longArray, cache hit rate will be higher). In practice, the map is filled with 35% - 70% (use 50% as average), so only half of the false lookups can benefit of it, all others will pay the cost of load the bitset (still need to access the longArray anyway). For aggregation, we always need to access the longArray (insert a new key after false lookup), also confirmed by a benchmark. For broadcast hash join, there could be a regression, but a simple benchmark showed that it may not (most of lookup are false): ``` sqlContext.range(1<<20).write.parquet("small") df = sqlContext.read.parquet('small') for i in range(3): t = time.time() df2 = sqlContext.range(1<<26).selectExpr("id * 1111111111 % 987654321 as id2") df2.join(df, df.id == df2.id2).count() print time.time() -t ``` Having bitset (used time in seconds): ``` 17.5404241085 10.2758829594 10.5786800385 ``` After removing bitset (used time in seconds): ``` 21.8939979076 12.4132959843 9.97224712372 ``` cc rxin nongli Author: Davies Liu <davies@databricks.com> Closes #9452 from davies/remove_bitset.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java58
1 files changed, 15 insertions, 43 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index e36709c6fc..07241c827c 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -35,7 +35,6 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.bitset.BitSet;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
@@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
private boolean canGrowArray = true;
- /**
- * A {@link BitSet} used to track location of the map where the key is set.
- * Size of the bitset should be half of the size of the long array.
- */
- @Nullable private BitSet bitset;
-
private final double loadFactor;
/**
@@ -427,7 +420,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
* This is a thread-safe version of `lookup`, could be used by multiple threads.
*/
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) {
- assert(bitset != null);
assert(longArray != null);
if (enablePerfMetrics) {
@@ -440,7 +432,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (enablePerfMetrics) {
numProbes++;
}
- if (!bitset.isSet(pos)) {
+ if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hashcode, false);
return;
@@ -644,7 +636,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (!isDefined) : "Can only set value once for a key";
assert (keyLength % 8 == 0);
assert (valueLength % 8 == 0);
- assert(bitset != null);
assert(longArray != null);
if (numElements == MAX_CAPACITY || !canGrowArray) {
@@ -678,7 +669,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
numElements++;
- bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
@@ -734,7 +724,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (capacity <= MAX_CAPACITY);
acquireMemory(capacity * 16);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
- bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
this.growthThreshold = (int) (capacity * loadFactor);
this.mask = capacity - 1;
@@ -749,7 +738,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
- bitset = null;
}
}
@@ -795,9 +783,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
- return totalDataPagesSize +
- ((bitset != null) ? bitset.memoryBlock().size() : 0L) +
- ((longArray != null) ? longArray.memoryBlock().size() : 0L);
+ return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
}
private void updatePeakMemoryUsed() {
@@ -852,7 +838,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
@VisibleForTesting
void growAndRehash() {
- assert(bitset != null);
assert(longArray != null);
long resizeStartTime = -1;
@@ -861,39 +846,26 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
// Store references to the old data structures to be used when we re-hash
final LongArray oldLongArray = longArray;
- final BitSet oldBitSet = bitset;
- final int oldCapacity = (int) oldBitSet.capacity();
+ final int oldCapacity = (int) oldLongArray.size() / 2;
// Allocate the new data structures
- try {
- allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
- } catch (OutOfMemoryError oom) {
- longArray = oldLongArray;
- bitset = oldBitSet;
- throw oom;
- }
+ allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
- for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {
- final long keyPointer = oldLongArray.get(pos * 2);
- final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
+ for (int i = 0; i < oldLongArray.size(); i += 2) {
+ final long keyPointer = oldLongArray.get(i);
+ if (keyPointer == 0) {
+ continue;
+ }
+ final int hashcode = (int) oldLongArray.get(i + 1);
int newPos = hashcode & mask;
int step = 1;
- boolean keepGoing = true;
-
- // No need to check for equality here when we insert so this has one less if branch than
- // the similar code path in addWithoutResize.
- while (keepGoing) {
- if (!bitset.isSet(newPos)) {
- bitset.set(newPos);
- longArray.set(newPos * 2, keyPointer);
- longArray.set(newPos * 2 + 1, hashcode);
- keepGoing = false;
- } else {
- newPos = (newPos + step) & mask;
- step++;
- }
+ while (longArray.get(newPos * 2) != 0) {
+ newPos = (newPos + step) & mask;
+ step++;
}
+ longArray.set(newPos * 2, keyPointer);
+ longArray.set(newPos * 2 + 1, hashcode);
}
releaseMemory(oldLongArray.memoryBlock().size());