aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-08-12 21:26:00 -0700
committerReynold Xin <rxin@databricks.com>2015-08-12 21:26:00 -0700
commita8ab2634c1eee143a4deaf309204df8add727f9e (patch)
treedccc4cfeed6be61aff6dbad37fdd2f088152d8c3 /core
parent2278219054314f1d31ffc358a59aa5067f9f5de9 (diff)
downloadspark-a8ab2634c1eee143a4deaf309204df8add727f9e.tar.gz
spark-a8ab2634c1eee143a4deaf309204df8add727f9e.tar.bz2
spark-a8ab2634c1eee143a4deaf309204df8add727f9e.zip
[SPARK-9832] [SQL] add a thread-safe lookup for BytesToBytseMap
This patch add a thread-safe lookup for BytesToBytseMap, and use that in broadcasted HashedRelation. Author: Davies Liu <davies@databricks.com> Closes #8151 from davies/safeLookup.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java30
1 files changed, 22 insertions, 8 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 87ed47e88c..5f3a4fcf4d 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -17,25 +17,24 @@
package org.apache.spark.unsafe.map;
-import java.lang.Override;
-import java.lang.UnsupportedOperationException;
+import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import javax.annotation.Nullable;
-
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.shuffle.ShuffleMemoryManager;
-import org.apache.spark.unsafe.*;
+import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.bitset.BitSet;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.*;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryLocation;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
/**
* An append-only hash map where keys and values are contiguous regions of bytes.
@@ -328,6 +327,20 @@ public final class BytesToBytesMap {
Object keyBaseObject,
long keyBaseOffset,
int keyRowLengthBytes) {
+ safeLookup(keyBaseObject, keyBaseOffset, keyRowLengthBytes, loc);
+ return loc;
+ }
+
+ /**
+ * Looks up a key, and saves the result in provided `loc`.
+ *
+ * This is a thread-safe version of `lookup`, could be used by multiple threads.
+ */
+ public void safeLookup(
+ Object keyBaseObject,
+ long keyBaseOffset,
+ int keyRowLengthBytes,
+ Location loc) {
assert(bitset != null);
assert(longArray != null);
@@ -343,7 +356,8 @@ public final class BytesToBytesMap {
}
if (!bitset.isSet(pos)) {
// This is a new key.
- return loc.with(pos, hashcode, false);
+ loc.with(pos, hashcode, false);
+ return;
} else {
long stored = longArray.get(pos * 2 + 1);
if ((int) (stored) == hashcode) {
@@ -361,7 +375,7 @@ public final class BytesToBytesMap {
keyRowLengthBytes
);
if (areEqual) {
- return loc;
+ return;
} else {
if (enablePerfMetrics) {
numHashCollisions++;