diff options
author | Davies Liu <davies@databricks.com> | 2015-08-12 21:26:00 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-12 21:26:00 -0700 |
commit | a8ab2634c1eee143a4deaf309204df8add727f9e (patch) | |
tree | dccc4cfeed6be61aff6dbad37fdd2f088152d8c3 /core | |
parent | 2278219054314f1d31ffc358a59aa5067f9f5de9 (diff) | |
download | spark-a8ab2634c1eee143a4deaf309204df8add727f9e.tar.gz spark-a8ab2634c1eee143a4deaf309204df8add727f9e.tar.bz2 spark-a8ab2634c1eee143a4deaf309204df8add727f9e.zip |
[SPARK-9832] [SQL] add a thread-safe lookup for BytesToBytseMap
This patch add a thread-safe lookup for BytesToBytseMap, and use that in broadcasted HashedRelation.
Author: Davies Liu <davies@databricks.com>
Closes #8151 from davies/safeLookup.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 87ed47e88c..5f3a4fcf4d 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -17,25 +17,24 @@ package org.apache.spark.unsafe.map; -import java.lang.Override; -import java.lang.UnsupportedOperationException; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import javax.annotation.Nullable; - import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.shuffle.ShuffleMemoryManager; -import org.apache.spark.unsafe.*; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.bitset.BitSet; import org.apache.spark.unsafe.hash.Murmur3_x86_32; -import org.apache.spark.unsafe.memory.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.memory.MemoryLocation; +import org.apache.spark.unsafe.memory.TaskMemoryManager; /** * An append-only hash map where keys and values are contiguous regions of bytes. @@ -328,6 +327,20 @@ public final class BytesToBytesMap { Object keyBaseObject, long keyBaseOffset, int keyRowLengthBytes) { + safeLookup(keyBaseObject, keyBaseOffset, keyRowLengthBytes, loc); + return loc; + } + + /** + * Looks up a key, and saves the result in provided `loc`. + * + * This is a thread-safe version of `lookup`, could be used by multiple threads. + */ + public void safeLookup( + Object keyBaseObject, + long keyBaseOffset, + int keyRowLengthBytes, + Location loc) { assert(bitset != null); assert(longArray != null); @@ -343,7 +356,8 @@ public final class BytesToBytesMap { } if (!bitset.isSet(pos)) { // This is a new key. - return loc.with(pos, hashcode, false); + loc.with(pos, hashcode, false); + return; } else { long stored = longArray.get(pos * 2 + 1); if ((int) (stored) == hashcode) { @@ -361,7 +375,7 @@ public final class BytesToBytesMap { keyRowLengthBytes ); if (areEqual) { - return loc; + return; } else { if (enablePerfMetrics) { numHashCollisions++; |