aboutsummaryrefslogtreecommitdiff
path: root/unsafe/src
diff options
context:
space:
mode:
Diffstat (limited to 'unsafe/src')
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java643
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java41
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java2
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java1
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java385
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOffHeapSuite.java29
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOnHeapSuite.java29
7 files changed, 2 insertions, 1128 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
deleted file mode 100644
index 198e0684f3..0000000000
--- a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ /dev/null
@@ -1,643 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.map;
-
-import java.lang.Override;
-import java.lang.UnsupportedOperationException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.spark.unsafe.*;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
-import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.bitset.BitSet;
-import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.*;
-
-/**
- * An append-only hash map where keys and values are contiguous regions of bytes.
- * <p>
- * This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
- * which is guaranteed to exhaust the space.
- * <p>
- * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
- * probably be using sorting instead of hashing for better cache locality.
- * <p>
- * This class is not thread safe.
- */
-public final class BytesToBytesMap {
-
- private static final Murmur3_x86_32 HASHER = new Murmur3_x86_32(0);
-
- private static final HashMapGrowthStrategy growthStrategy = HashMapGrowthStrategy.DOUBLING;
-
- /**
- * Special record length that is placed after the last record in a data page.
- */
- private static final int END_OF_PAGE_MARKER = -1;
-
- private final TaskMemoryManager memoryManager;
-
- /**
- * A linked list for tracking all allocated data pages so that we can free all of our memory.
- */
- private final List<MemoryBlock> dataPages = new LinkedList<MemoryBlock>();
-
- /**
- * The data page that will be used to store keys and values for new hashtable entries. When this
- * page becomes full, a new page will be allocated and this pointer will change to point to that
- * new page.
- */
- private MemoryBlock currentDataPage = null;
-
- /**
- * Offset into `currentDataPage` that points to the location where new data can be inserted into
- * the page. This does not incorporate the page's base offset.
- */
- private long pageCursor = 0;
-
- /**
- * The maximum number of keys that BytesToBytesMap supports. The hash table has to be
- * power-of-2-sized and its backing Java array can contain at most (1 << 30) elements, since
- * that's the largest power-of-2 that's less than Integer.MAX_VALUE. We need two long array
- * entries per key, giving us a maximum capacity of (1 << 29).
- */
- @VisibleForTesting
- static final int MAX_CAPACITY = (1 << 29);
-
- // This choice of page table size and page size means that we can address up to 500 gigabytes
- // of memory.
-
- /**
- * A single array to store the key and value.
- *
- * Position {@code 2 * i} in the array is used to track a pointer to the key at index {@code i},
- * while position {@code 2 * i + 1} in the array holds key's full 32-bit hashcode.
- */
- private LongArray longArray;
- // TODO: we're wasting 32 bits of space here; we can probably store fewer bits of the hashcode
- // and exploit word-alignment to use fewer bits to hold the address. This might let us store
- // only one long per map entry, increasing the chance that this array will fit in cache at the
- // expense of maybe performing more lookups if we have hash collisions. Say that we stored only
- // 27 bits of the hashcode and 37 bits of the address. 37 bits is enough to address 1 terabyte
- // of RAM given word-alignment. If we use 13 bits of this for our page table, that gives us a
- // maximum page size of 2^24 * 8 = ~134 megabytes per page. This change will require us to store
- // full base addresses in the page table for off-heap mode so that we can reconstruct the full
- // absolute memory addresses.
-
- /**
- * A {@link BitSet} used to track location of the map where the key is set.
- * Size of the bitset should be half of the size of the long array.
- */
- private BitSet bitset;
-
- private final double loadFactor;
-
- /**
- * The size of the data pages that hold key and value data. Map entries cannot span multiple
- * pages, so this limits the maximum entry size.
- */
- private final long pageSizeBytes;
-
- /**
- * Number of keys defined in the map.
- */
- private int size;
-
- /**
- * The map will be expanded once the number of keys exceeds this threshold.
- */
- private int growthThreshold;
-
- /**
- * Mask for truncating hashcodes so that they do not exceed the long array's size.
- * This is a strength reduction optimization; we're essentially performing a modulus operation,
- * but doing so with a bitmask because this is a power-of-2-sized hash map.
- */
- private int mask;
-
- /**
- * Return value of {@link BytesToBytesMap#lookup(Object, long, int)}.
- */
- private final Location loc;
-
- private final boolean enablePerfMetrics;
-
- private long timeSpentResizingNs = 0;
-
- private long numProbes = 0;
-
- private long numKeyLookups = 0;
-
- private long numHashCollisions = 0;
-
- public BytesToBytesMap(
- TaskMemoryManager memoryManager,
- int initialCapacity,
- double loadFactor,
- long pageSizeBytes,
- boolean enablePerfMetrics) {
- this.memoryManager = memoryManager;
- this.loadFactor = loadFactor;
- this.loc = new Location();
- this.pageSizeBytes = pageSizeBytes;
- this.enablePerfMetrics = enablePerfMetrics;
- if (initialCapacity <= 0) {
- throw new IllegalArgumentException("Initial capacity must be greater than 0");
- }
- if (initialCapacity > MAX_CAPACITY) {
- throw new IllegalArgumentException(
- "Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY);
- }
- if (pageSizeBytes > TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES) {
- throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " +
- TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
- }
- allocate(initialCapacity);
- }
-
- public BytesToBytesMap(
- TaskMemoryManager memoryManager,
- int initialCapacity,
- long pageSizeBytes) {
- this(memoryManager, initialCapacity, 0.70, pageSizeBytes, false);
- }
-
- public BytesToBytesMap(
- TaskMemoryManager memoryManager,
- int initialCapacity,
- long pageSizeBytes,
- boolean enablePerfMetrics) {
- this(memoryManager, initialCapacity, 0.70, pageSizeBytes, enablePerfMetrics);
- }
-
- /**
- * Returns the number of keys defined in the map.
- */
- public int size() { return size; }
-
- private static final class BytesToBytesMapIterator implements Iterator<Location> {
-
- private final int numRecords;
- private final Iterator<MemoryBlock> dataPagesIterator;
- private final Location loc;
-
- private int currentRecordNumber = 0;
- private Object pageBaseObject;
- private long offsetInPage;
-
- BytesToBytesMapIterator(int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
- this.numRecords = numRecords;
- this.dataPagesIterator = dataPagesIterator;
- this.loc = loc;
- if (dataPagesIterator.hasNext()) {
- advanceToNextPage();
- }
- }
-
- private void advanceToNextPage() {
- final MemoryBlock currentPage = dataPagesIterator.next();
- pageBaseObject = currentPage.getBaseObject();
- offsetInPage = currentPage.getBaseOffset();
- }
-
- @Override
- public boolean hasNext() {
- return currentRecordNumber != numRecords;
- }
-
- @Override
- public Location next() {
- int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
- if (keyLength == END_OF_PAGE_MARKER) {
- advanceToNextPage();
- keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
- }
- loc.with(pageBaseObject, offsetInPage);
- offsetInPage += 8 + 8 + keyLength + loc.getValueLength();
- currentRecordNumber++;
- return loc;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * Returns an iterator for iterating over the entries of this map.
- *
- * For efficiency, all calls to `next()` will return the same {@link Location} object.
- *
- * If any other lookups or operations are performed on this map while iterating over it, including
- * `lookup()`, the behavior of the returned iterator is undefined.
- */
- public Iterator<Location> iterator() {
- return new BytesToBytesMapIterator(size, dataPages.iterator(), loc);
- }
-
- /**
- * Looks up a key, and return a {@link Location} handle that can be used to test existence
- * and read/write values.
- *
- * This function always return the same {@link Location} instance to avoid object allocation.
- */
- public Location lookup(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyRowLengthBytes) {
- if (enablePerfMetrics) {
- numKeyLookups++;
- }
- final int hashcode = HASHER.hashUnsafeWords(keyBaseObject, keyBaseOffset, keyRowLengthBytes);
- int pos = hashcode & mask;
- int step = 1;
- while (true) {
- if (enablePerfMetrics) {
- numProbes++;
- }
- if (!bitset.isSet(pos)) {
- // This is a new key.
- return loc.with(pos, hashcode, false);
- } else {
- long stored = longArray.get(pos * 2 + 1);
- if ((int) (stored) == hashcode) {
- // Full hash code matches. Let's compare the keys for equality.
- loc.with(pos, hashcode, true);
- if (loc.getKeyLength() == keyRowLengthBytes) {
- final MemoryLocation keyAddress = loc.getKeyAddress();
- final Object storedKeyBaseObject = keyAddress.getBaseObject();
- final long storedKeyBaseOffset = keyAddress.getBaseOffset();
- final boolean areEqual = ByteArrayMethods.arrayEquals(
- keyBaseObject,
- keyBaseOffset,
- storedKeyBaseObject,
- storedKeyBaseOffset,
- keyRowLengthBytes
- );
- if (areEqual) {
- return loc;
- } else {
- if (enablePerfMetrics) {
- numHashCollisions++;
- }
- }
- }
- }
- }
- pos = (pos + step) & mask;
- step++;
- }
- }
-
- /**
- * Handle returned by {@link BytesToBytesMap#lookup(Object, long, int)} function.
- */
- public final class Location {
- /** An index into the hash map's Long array */
- private int pos;
- /** True if this location points to a position where a key is defined, false otherwise */
- private boolean isDefined;
- /**
- * The hashcode of the most recent key passed to
- * {@link BytesToBytesMap#lookup(Object, long, int)}. Caching this hashcode here allows us to
- * avoid re-hashing the key when storing a value for that key.
- */
- private int keyHashcode;
- private final MemoryLocation keyMemoryLocation = new MemoryLocation();
- private final MemoryLocation valueMemoryLocation = new MemoryLocation();
- private int keyLength;
- private int valueLength;
-
- private void updateAddressesAndSizes(long fullKeyAddress) {
- updateAddressesAndSizes(
- memoryManager.getPage(fullKeyAddress), memoryManager.getOffsetInPage(fullKeyAddress));
- }
-
- private void updateAddressesAndSizes(Object page, long keyOffsetInPage) {
- long position = keyOffsetInPage;
- keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
- position += 8; // word used to store the key size
- keyMemoryLocation.setObjAndOffset(page, position);
- position += keyLength;
- valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
- position += 8; // word used to store the key size
- valueMemoryLocation.setObjAndOffset(page, position);
- }
-
- Location with(int pos, int keyHashcode, boolean isDefined) {
- this.pos = pos;
- this.isDefined = isDefined;
- this.keyHashcode = keyHashcode;
- if (isDefined) {
- final long fullKeyAddress = longArray.get(pos * 2);
- updateAddressesAndSizes(fullKeyAddress);
- }
- return this;
- }
-
- Location with(Object page, long keyOffsetInPage) {
- this.isDefined = true;
- updateAddressesAndSizes(page, keyOffsetInPage);
- return this;
- }
-
- /**
- * Returns true if the key is defined at this position, and false otherwise.
- */
- public boolean isDefined() {
- return isDefined;
- }
-
- /**
- * Returns the address of the key defined at this position.
- * This points to the first byte of the key data.
- * Unspecified behavior if the key is not defined.
- * For efficiency reasons, calls to this method always returns the same MemoryLocation object.
- */
- public MemoryLocation getKeyAddress() {
- assert (isDefined);
- return keyMemoryLocation;
- }
-
- /**
- * Returns the length of the key defined at this position.
- * Unspecified behavior if the key is not defined.
- */
- public int getKeyLength() {
- assert (isDefined);
- return keyLength;
- }
-
- /**
- * Returns the address of the value defined at this position.
- * This points to the first byte of the value data.
- * Unspecified behavior if the key is not defined.
- * For efficiency reasons, calls to this method always returns the same MemoryLocation object.
- */
- public MemoryLocation getValueAddress() {
- assert (isDefined);
- return valueMemoryLocation;
- }
-
- /**
- * Returns the length of the value defined at this position.
- * Unspecified behavior if the key is not defined.
- */
- public int getValueLength() {
- assert (isDefined);
- return valueLength;
- }
-
- /**
- * Store a new key and value. This method may only be called once for a given key; if you want
- * to update the value associated with a key, then you can directly manipulate the bytes stored
- * at the value address.
- * <p>
- * It is only valid to call this method immediately after calling `lookup()` using the same key.
- * </p>
- * <p>
- * The key and value must be word-aligned (that is, their sizes must multiples of 8).
- * </p>
- * <p>
- * After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
- * will return information on the data stored by this `putNewKey` call.
- * </p>
- * <p>
- * As an example usage, here's the proper way to store a new key:
- * </p>
- * <pre>
- * Location loc = map.lookup(keyBaseObject, keyBaseOffset, keyLengthInBytes);
- * if (!loc.isDefined()) {
- * loc.putNewKey(keyBaseObject, keyBaseOffset, keyLengthInBytes, ...)
- * }
- * </pre>
- * <p>
- * Unspecified behavior if the key is not defined.
- * </p>
- */
- public void putNewKey(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyLengthBytes,
- Object valueBaseObject,
- long valueBaseOffset,
- int valueLengthBytes) {
- assert (!isDefined) : "Can only set value once for a key";
- assert (keyLengthBytes % 8 == 0);
- assert (valueLengthBytes % 8 == 0);
- if (size == MAX_CAPACITY) {
- throw new IllegalStateException("BytesToBytesMap has reached maximum capacity");
- }
- // Here, we'll copy the data into our data pages. Because we only store a relative offset from
- // the key address instead of storing the absolute address of the value, the key and value
- // must be stored in the same memory page.
- // (8 byte key length) (key) (8 byte value length) (value)
- final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
- assert (requiredSize <= pageSizeBytes - 8); // Reserve 8 bytes for the end-of-page marker.
- size++;
- bitset.set(pos);
-
- // If there's not enough space in the current page, allocate a new page (8 bytes are reserved
- // for the end-of-page marker).
- if (currentDataPage == null || pageSizeBytes - 8 - pageCursor < requiredSize) {
- if (currentDataPage != null) {
- // There wasn't enough space in the current page, so write an end-of-page marker:
- final Object pageBaseObject = currentDataPage.getBaseObject();
- final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
- PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
- }
- MemoryBlock newPage = memoryManager.allocatePage(pageSizeBytes);
- dataPages.add(newPage);
- pageCursor = 0;
- currentDataPage = newPage;
- }
-
- // Compute all of our offsets up-front:
- final Object pageBaseObject = currentDataPage.getBaseObject();
- final long pageBaseOffset = currentDataPage.getBaseOffset();
- final long keySizeOffsetInPage = pageBaseOffset + pageCursor;
- pageCursor += 8; // word used to store the key size
- final long keyDataOffsetInPage = pageBaseOffset + pageCursor;
- pageCursor += keyLengthBytes;
- final long valueSizeOffsetInPage = pageBaseOffset + pageCursor;
- pageCursor += 8; // word used to store the value size
- final long valueDataOffsetInPage = pageBaseOffset + pageCursor;
- pageCursor += valueLengthBytes;
-
- // Copy the key
- PlatformDependent.UNSAFE.putLong(pageBaseObject, keySizeOffsetInPage, keyLengthBytes);
- PlatformDependent.copyMemory(
- keyBaseObject, keyBaseOffset, pageBaseObject, keyDataOffsetInPage, keyLengthBytes);
- // Copy the value
- PlatformDependent.UNSAFE.putLong(pageBaseObject, valueSizeOffsetInPage, valueLengthBytes);
- PlatformDependent.copyMemory(
- valueBaseObject, valueBaseOffset, pageBaseObject, valueDataOffsetInPage, valueLengthBytes);
-
- final long storedKeyAddress = memoryManager.encodePageNumberAndOffset(
- currentDataPage, keySizeOffsetInPage);
- longArray.set(pos * 2, storedKeyAddress);
- longArray.set(pos * 2 + 1, keyHashcode);
- updateAddressesAndSizes(storedKeyAddress);
- isDefined = true;
- if (size > growthThreshold && longArray.size() < MAX_CAPACITY) {
- growAndRehash();
- }
- }
- }
-
- /**
- * Allocate new data structures for this map. When calling this outside of the constructor,
- * make sure to keep references to the old data structures so that you can free them.
- *
- * @param capacity the new map capacity
- */
- private void allocate(int capacity) {
- assert (capacity >= 0);
- // The capacity needs to be divisible by 64 so that our bit set can be sized properly
- capacity = Math.max((int) Math.min(MAX_CAPACITY, nextPowerOf2(capacity)), 64);
- assert (capacity <= MAX_CAPACITY);
- longArray = new LongArray(memoryManager.allocate(capacity * 8L * 2));
- bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
-
- this.growthThreshold = (int) (capacity * loadFactor);
- this.mask = capacity - 1;
- }
-
- /**
- * Free all allocated memory associated with this map, including the storage for keys and values
- * as well as the hash map array itself.
- *
- * This method is idempotent.
- */
- public void free() {
- if (longArray != null) {
- memoryManager.free(longArray.memoryBlock());
- longArray = null;
- }
- if (bitset != null) {
- // The bitset's heap memory isn't managed by a memory manager, so no need to free it here.
- bitset = null;
- }
- Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
- while (dataPagesIterator.hasNext()) {
- memoryManager.freePage(dataPagesIterator.next());
- dataPagesIterator.remove();
- }
- assert(dataPages.isEmpty());
- }
-
- /** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
- public long getTotalMemoryConsumption() {
- long totalDataPagesSize = 0L;
- for (MemoryBlock dataPage : dataPages) {
- totalDataPagesSize += dataPage.size();
- }
- return totalDataPagesSize + bitset.memoryBlock().size() + longArray.memoryBlock().size();
- }
-
- /**
- * Returns the total amount of time spent resizing this map (in nanoseconds).
- */
- public long getTimeSpentResizingNs() {
- if (!enablePerfMetrics) {
- throw new IllegalStateException();
- }
- return timeSpentResizingNs;
- }
-
-
- /**
- * Returns the average number of probes per key lookup.
- */
- public double getAverageProbesPerLookup() {
- if (!enablePerfMetrics) {
- throw new IllegalStateException();
- }
- return (1.0 * numProbes) / numKeyLookups;
- }
-
- public long getNumHashCollisions() {
- if (!enablePerfMetrics) {
- throw new IllegalStateException();
- }
- return numHashCollisions;
- }
-
- @VisibleForTesting
- int getNumDataPages() {
- return dataPages.size();
- }
-
- /**
- * Grows the size of the hash table and re-hash everything.
- */
- @VisibleForTesting
- void growAndRehash() {
- long resizeStartTime = -1;
- if (enablePerfMetrics) {
- resizeStartTime = System.nanoTime();
- }
- // Store references to the old data structures to be used when we re-hash
- final LongArray oldLongArray = longArray;
- final BitSet oldBitSet = bitset;
- final int oldCapacity = (int) oldBitSet.capacity();
-
- // Allocate the new data structures
- allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
-
- // Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
- for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {
- final long keyPointer = oldLongArray.get(pos * 2);
- final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
- int newPos = hashcode & mask;
- int step = 1;
- boolean keepGoing = true;
-
- // No need to check for equality here when we insert so this has one less if branch than
- // the similar code path in addWithoutResize.
- while (keepGoing) {
- if (!bitset.isSet(newPos)) {
- bitset.set(newPos);
- longArray.set(newPos * 2, keyPointer);
- longArray.set(newPos * 2 + 1, hashcode);
- keepGoing = false;
- } else {
- newPos = (newPos + step) & mask;
- step++;
- }
- }
- }
-
- // Deallocate the old data structures.
- memoryManager.free(oldLongArray.memoryBlock());
- if (enablePerfMetrics) {
- timeSpentResizingNs += System.nanoTime() - resizeStartTime;
- }
- }
-
- /** Returns the next number greater or equal num that is power of 2. */
- private static long nextPowerOf2(long num) {
- final long highBit = Long.highestOneBit(num);
- return (highBit == num) ? num : highBit << 1;
- }
-}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java b/unsafe/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
deleted file mode 100644
index 20654e4eea..0000000000
--- a/unsafe/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.map;
-
-/**
- * Interface that defines how we can grow the size of a hash map when it is over a threshold.
- */
-public interface HashMapGrowthStrategy {
-
- int nextCapacity(int currentCapacity);
-
- /**
- * Double the size of the hash map every time.
- */
- HashMapGrowthStrategy DOUBLING = new Doubling();
-
- class Doubling implements HashMapGrowthStrategy {
- @Override
- public int nextCapacity(int currentCapacity) {
- assert (currentCapacity > 0);
- // Guard against overflow
- return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE;
- }
- }
-
-}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
index 3dc82d8c2e..91be46ba21 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
@@ -34,7 +34,7 @@ public class MemoryBlock extends MemoryLocation {
*/
int pageNumber = -1;
- MemoryBlock(@Nullable Object obj, long offset, long length) {
+ public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
this.length = length;
}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
index dd70df3b1f..358bb37250 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
@@ -164,6 +164,7 @@ public class TaskMemoryManager {
* top-level Javadoc for more details).
*/
public MemoryBlock allocate(long size) throws OutOfMemoryError {
+ assert(size > 0) : "Size must be positive, but got " + size;
final MemoryBlock memory = executorMemoryManager.allocate(size);
allocatedNonPageMemory.add(memory);
return memory;
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
deleted file mode 100644
index 0be94ad371..0000000000
--- a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.map;
-
-import java.lang.Exception;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import static org.mockito.AdditionalMatchers.geq;
-import static org.mockito.Mockito.*;
-
-import org.apache.spark.unsafe.array.ByteArrayMethods;
-import org.apache.spark.unsafe.memory.*;
-import org.apache.spark.unsafe.PlatformDependent;
-import static org.apache.spark.unsafe.PlatformDependent.BYTE_ARRAY_OFFSET;
-import static org.apache.spark.unsafe.PlatformDependent.LONG_ARRAY_OFFSET;
-
-
-public abstract class AbstractBytesToBytesMapSuite {
-
- private final Random rand = new Random(42);
-
- private TaskMemoryManager memoryManager;
- private TaskMemoryManager sizeLimitedMemoryManager;
- private final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
-
- @Before
- public void setup() {
- memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
- // Mocked memory manager for tests that check the maximum array size, since actually allocating
- // such large arrays will cause us to run out of memory in our tests.
- sizeLimitedMemoryManager = spy(memoryManager);
- when(sizeLimitedMemoryManager.allocate(geq(1L << 20))).thenAnswer(new Answer<MemoryBlock>() {
- @Override
- public MemoryBlock answer(InvocationOnMock invocation) throws Throwable {
- if (((Long) invocation.getArguments()[0] / 8) > Integer.MAX_VALUE) {
- throw new OutOfMemoryError("Requested array size exceeds VM limit");
- }
- return memoryManager.allocate(1L << 20);
- }
- });
- }
-
- @After
- public void tearDown() {
- if (memoryManager != null) {
- memoryManager.cleanUpAllAllocatedMemory();
- memoryManager = null;
- }
- }
-
- protected abstract MemoryAllocator getMemoryAllocator();
-
- private static byte[] getByteArray(MemoryLocation loc, int size) {
- final byte[] arr = new byte[size];
- PlatformDependent.copyMemory(
- loc.getBaseObject(),
- loc.getBaseOffset(),
- arr,
- BYTE_ARRAY_OFFSET,
- size
- );
- return arr;
- }
-
- private byte[] getRandomByteArray(int numWords) {
- Assert.assertTrue(numWords > 0);
- final int lengthInBytes = numWords * 8;
- final byte[] bytes = new byte[lengthInBytes];
- rand.nextBytes(bytes);
- return bytes;
- }
-
- /**
- * Fast equality checking for byte arrays, since these comparisons are a bottleneck
- * in our stress tests.
- */
- private static boolean arrayEquals(
- byte[] expected,
- MemoryLocation actualAddr,
- long actualLengthBytes) {
- return (actualLengthBytes == expected.length) && ByteArrayMethods.arrayEquals(
- expected,
- BYTE_ARRAY_OFFSET,
- actualAddr.getBaseObject(),
- actualAddr.getBaseOffset(),
- expected.length
- );
- }
-
- @Test
- public void emptyMap() {
- BytesToBytesMap map = new BytesToBytesMap(memoryManager, 64, PAGE_SIZE_BYTES);
- try {
- Assert.assertEquals(0, map.size());
- final int keyLengthInWords = 10;
- final int keyLengthInBytes = keyLengthInWords * 8;
- final byte[] key = getRandomByteArray(keyLengthInWords);
- Assert.assertFalse(map.lookup(key, BYTE_ARRAY_OFFSET, keyLengthInBytes).isDefined());
- Assert.assertFalse(map.iterator().hasNext());
- } finally {
- map.free();
- }
- }
-
- @Test
- public void setAndRetrieveAKey() {
- BytesToBytesMap map = new BytesToBytesMap(memoryManager, 64, PAGE_SIZE_BYTES);
- final int recordLengthWords = 10;
- final int recordLengthBytes = recordLengthWords * 8;
- final byte[] keyData = getRandomByteArray(recordLengthWords);
- final byte[] valueData = getRandomByteArray(recordLengthWords);
- try {
- final BytesToBytesMap.Location loc =
- map.lookup(keyData, BYTE_ARRAY_OFFSET, recordLengthBytes);
- Assert.assertFalse(loc.isDefined());
- loc.putNewKey(
- keyData,
- BYTE_ARRAY_OFFSET,
- recordLengthBytes,
- valueData,
- BYTE_ARRAY_OFFSET,
- recordLengthBytes
- );
- // After storing the key and value, the other location methods should return results that
- // reflect the result of this store without us having to call lookup() again on the same key.
- Assert.assertEquals(recordLengthBytes, loc.getKeyLength());
- Assert.assertEquals(recordLengthBytes, loc.getValueLength());
- Assert.assertArrayEquals(keyData, getByteArray(loc.getKeyAddress(), recordLengthBytes));
- Assert.assertArrayEquals(valueData, getByteArray(loc.getValueAddress(), recordLengthBytes));
-
- // After calling lookup() the location should still point to the correct data.
- Assert.assertTrue(map.lookup(keyData, BYTE_ARRAY_OFFSET, recordLengthBytes).isDefined());
- Assert.assertEquals(recordLengthBytes, loc.getKeyLength());
- Assert.assertEquals(recordLengthBytes, loc.getValueLength());
- Assert.assertArrayEquals(keyData, getByteArray(loc.getKeyAddress(), recordLengthBytes));
- Assert.assertArrayEquals(valueData, getByteArray(loc.getValueAddress(), recordLengthBytes));
-
- try {
- loc.putNewKey(
- keyData,
- BYTE_ARRAY_OFFSET,
- recordLengthBytes,
- valueData,
- BYTE_ARRAY_OFFSET,
- recordLengthBytes
- );
- Assert.fail("Should not be able to set a new value for a key");
- } catch (AssertionError e) {
- // Expected exception; do nothing.
- }
- } finally {
- map.free();
- }
- }
-
- @Test
- public void iteratorTest() throws Exception {
- final int size = 4096;
- BytesToBytesMap map = new BytesToBytesMap(memoryManager, size / 2, PAGE_SIZE_BYTES);
- try {
- for (long i = 0; i < size; i++) {
- final long[] value = new long[] { i };
- final BytesToBytesMap.Location loc =
- map.lookup(value, PlatformDependent.LONG_ARRAY_OFFSET, 8);
- Assert.assertFalse(loc.isDefined());
- // Ensure that we store some zero-length keys
- if (i % 5 == 0) {
- loc.putNewKey(
- null,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 0,
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8
- );
- } else {
- loc.putNewKey(
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8,
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8
- );
- }
- }
- final java.util.BitSet valuesSeen = new java.util.BitSet(size);
- final Iterator<BytesToBytesMap.Location> iter = map.iterator();
- while (iter.hasNext()) {
- final BytesToBytesMap.Location loc = iter.next();
- Assert.assertTrue(loc.isDefined());
- final MemoryLocation keyAddress = loc.getKeyAddress();
- final MemoryLocation valueAddress = loc.getValueAddress();
- final long value = PlatformDependent.UNSAFE.getLong(
- valueAddress.getBaseObject(), valueAddress.getBaseOffset());
- final long keyLength = loc.getKeyLength();
- if (keyLength == 0) {
- Assert.assertTrue("value " + value + " was not divisible by 5", value % 5 == 0);
- } else {
- final long key = PlatformDependent.UNSAFE.getLong(
- keyAddress.getBaseObject(), keyAddress.getBaseOffset());
- Assert.assertEquals(value, key);
- }
- valuesSeen.set((int) value);
- }
- Assert.assertEquals(size, valuesSeen.cardinality());
- } finally {
- map.free();
- }
- }
-
- @Test
- public void iteratingOverDataPagesWithWastedSpace() throws Exception {
- final int NUM_ENTRIES = 1000 * 1000;
- final int KEY_LENGTH = 16;
- final int VALUE_LENGTH = 40;
- final BytesToBytesMap map = new BytesToBytesMap(memoryManager, NUM_ENTRIES, PAGE_SIZE_BYTES);
- // Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
- // pages won't be evenly-divisible by records of this size, which will cause us to waste some
- // space at the end of the page. This is necessary in order for us to take the end-of-record
- // handling branch in iterator().
- try {
- for (int i = 0; i < NUM_ENTRIES; i++) {
- final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes
- final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes
- final BytesToBytesMap.Location loc = map.lookup(
- key,
- LONG_ARRAY_OFFSET,
- KEY_LENGTH
- );
- Assert.assertFalse(loc.isDefined());
- loc.putNewKey(
- key,
- LONG_ARRAY_OFFSET,
- KEY_LENGTH,
- value,
- LONG_ARRAY_OFFSET,
- VALUE_LENGTH
- );
- }
- Assert.assertEquals(2, map.getNumDataPages());
-
- final java.util.BitSet valuesSeen = new java.util.BitSet(NUM_ENTRIES);
- final Iterator<BytesToBytesMap.Location> iter = map.iterator();
- final long key[] = new long[KEY_LENGTH / 8];
- final long value[] = new long[VALUE_LENGTH / 8];
- while (iter.hasNext()) {
- final BytesToBytesMap.Location loc = iter.next();
- Assert.assertTrue(loc.isDefined());
- Assert.assertEquals(KEY_LENGTH, loc.getKeyLength());
- Assert.assertEquals(VALUE_LENGTH, loc.getValueLength());
- PlatformDependent.copyMemory(
- loc.getKeyAddress().getBaseObject(),
- loc.getKeyAddress().getBaseOffset(),
- key,
- LONG_ARRAY_OFFSET,
- KEY_LENGTH
- );
- PlatformDependent.copyMemory(
- loc.getValueAddress().getBaseObject(),
- loc.getValueAddress().getBaseOffset(),
- value,
- LONG_ARRAY_OFFSET,
- VALUE_LENGTH
- );
- for (long j : key) {
- Assert.assertEquals(key[0], j);
- }
- for (long j : value) {
- Assert.assertEquals(key[0], j);
- }
- valuesSeen.set((int) key[0]);
- }
- Assert.assertEquals(NUM_ENTRIES, valuesSeen.cardinality());
- } finally {
- map.free();
- }
- }
-
- @Test
- public void randomizedStressTest() {
- final int size = 65536;
- // Java arrays' hashCodes() aren't based on the arrays' contents, so we need to wrap arrays
- // into ByteBuffers in order to use them as keys here.
- final Map<ByteBuffer, byte[]> expected = new HashMap<ByteBuffer, byte[]>();
- final BytesToBytesMap map = new BytesToBytesMap(memoryManager, size, PAGE_SIZE_BYTES);
-
- try {
- // Fill the map to 90% full so that we can trigger probing
- for (int i = 0; i < size * 0.9; i++) {
- final byte[] key = getRandomByteArray(rand.nextInt(256) + 1);
- final byte[] value = getRandomByteArray(rand.nextInt(512) + 1);
- if (!expected.containsKey(ByteBuffer.wrap(key))) {
- expected.put(ByteBuffer.wrap(key), value);
- final BytesToBytesMap.Location loc = map.lookup(
- key,
- BYTE_ARRAY_OFFSET,
- key.length
- );
- Assert.assertFalse(loc.isDefined());
- loc.putNewKey(
- key,
- BYTE_ARRAY_OFFSET,
- key.length,
- value,
- BYTE_ARRAY_OFFSET,
- value.length
- );
- // After calling putNewKey, the following should be true, even before calling
- // lookup():
- Assert.assertTrue(loc.isDefined());
- Assert.assertEquals(key.length, loc.getKeyLength());
- Assert.assertEquals(value.length, loc.getValueLength());
- Assert.assertTrue(arrayEquals(key, loc.getKeyAddress(), key.length));
- Assert.assertTrue(arrayEquals(value, loc.getValueAddress(), value.length));
- }
- }
-
- for (Map.Entry<ByteBuffer, byte[]> entry : expected.entrySet()) {
- final byte[] key = entry.getKey().array();
- final byte[] value = entry.getValue();
- final BytesToBytesMap.Location loc = map.lookup(key, BYTE_ARRAY_OFFSET, key.length);
- Assert.assertTrue(loc.isDefined());
- Assert.assertTrue(arrayEquals(key, loc.getKeyAddress(), loc.getKeyLength()));
- Assert.assertTrue(arrayEquals(value, loc.getValueAddress(), loc.getValueLength()));
- }
- } finally {
- map.free();
- }
- }
-
- @Test
- public void initialCapacityBoundsChecking() {
- try {
- new BytesToBytesMap(sizeLimitedMemoryManager, 0, PAGE_SIZE_BYTES);
- Assert.fail("Expected IllegalArgumentException to be thrown");
- } catch (IllegalArgumentException e) {
- // expected exception
- }
-
- try {
- new BytesToBytesMap(
- sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY + 1, PAGE_SIZE_BYTES);
- Assert.fail("Expected IllegalArgumentException to be thrown");
- } catch (IllegalArgumentException e) {
- // expected exception
- }
-
- // Can allocate _at_ the max capacity
- BytesToBytesMap map =
- new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY, PAGE_SIZE_BYTES);
- map.free();
- }
-
- @Test
- public void resizingLargeMap() {
- // As long as a map's capacity is below the max, we should be able to resize up to the max
- BytesToBytesMap map = new BytesToBytesMap(
- sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY - 64, PAGE_SIZE_BYTES);
- map.growAndRehash();
- map.free();
- }
-}
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOffHeapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOffHeapSuite.java
deleted file mode 100644
index 5a10de49f5..0000000000
--- a/unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOffHeapSuite.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.map;
-
-import org.apache.spark.unsafe.memory.MemoryAllocator;
-
-public class BytesToBytesMapOffHeapSuite extends AbstractBytesToBytesMapSuite {
-
- @Override
- protected MemoryAllocator getMemoryAllocator() {
- return MemoryAllocator.UNSAFE;
- }
-
-}
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOnHeapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOnHeapSuite.java
deleted file mode 100644
index 12cc9b25d9..0000000000
--- a/unsafe/src/test/java/org/apache/spark/unsafe/map/BytesToBytesMapOnHeapSuite.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.map;
-
-import org.apache.spark.unsafe.memory.MemoryAllocator;
-
-public class BytesToBytesMapOnHeapSuite extends AbstractBytesToBytesMapSuite {
-
- @Override
- protected MemoryAllocator getMemoryAllocator() {
- return MemoryAllocator.HEAP;
- }
-
-}