aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2016-03-23 20:51:01 +0100
committerHerman van Hovell <hvanhovell@questtec.nl>2016-03-23 20:51:01 +0100
commit919bf321987712d9143cae3c4e064fcb077ded1f (patch)
tree0dc2bc80c5d82c9166cfb1d34d65450d5f40e2e1
parent8c826880f5eaa3221c4e9e7d3fece54e821a0b98 (diff)
downloadspark-919bf321987712d9143cae3c4e064fcb077ded1f.tar.gz
spark-919bf321987712d9143cae3c4e064fcb077ded1f.tar.bz2
spark-919bf321987712d9143cae3c4e064fcb077ded1f.zip
[SPARK-13325][SQL] Create a 64-bit hashcode expression
This PR introduces a 64-bit hashcode expression. Such an expression is especially usefull for HyperLogLog++ and other probabilistic datastructures. I have implemented xxHash64 which is a 64-bit hashing algorithm created by Yann Colet and Mathias Westerdahl. This is a high speed (C implementation runs at memory bandwidth) and high quality hashcode. It exploits both Instruction Level Parralellism (for speed) and the multiplication and rotation techniques (for quality) like MurMurHash does. The initial results are promising. I have added a CG'ed test to the `HashBenchmark`, and this results in the following results (running from SBT): Running benchmark: Hash For simple Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For simple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1011 / 1016 132.8 7.5 1.0X codegen version 1864 / 1869 72.0 13.9 0.5X codegen version 64-bit 1614 / 1644 83.2 12.0 0.6X Running benchmark: Hash For normal Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 2467 / 2475 0.9 1176.1 1.0X codegen version 2008 / 2115 1.0 957.5 1.2X codegen version 64-bit 728 / 758 2.9 347.0 3.4X Running benchmark: Hash For array Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1544 / 1707 0.1 11779.6 1.0X codegen version 2728 / 2745 0.0 20815.5 0.6X codegen version 64-bit 2508 / 2549 0.1 19132.8 0.6X Running benchmark: Hash For map Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1819 / 1826 0.0 444014.3 1.0X codegen version 183 / 194 0.0 44642.9 9.9X codegen version 64-bit 173 / 174 0.0 42120.9 10.5X This shows that algorithm is consistently faster than MurMurHash32 in all cases and up to 3x (!) in the normal case. I have also added this to HyperLogLog++ and it cuts the processing time of the following code in half: val df = sqlContext.range(1<<25).agg(approxCountDistinct("id")) df.explain() val t = System.nanoTime() df.show() val ns = System.nanoTime() - t // Before ns: Long = 5821524302 // After ns: Long = 2836418963 cc cloud-fan (you have been working on hashcodes) / rxin Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #11209 from hvanhovell/xxHash.
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java192
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala238
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java166
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala64
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala148
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala13
7 files changed, 713 insertions, 110 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
new file mode 100644
index 0000000000..5f2de266b5
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.util.SystemClock;
+
+// scalastyle: off
+
+/**
+ * xxHash64. A high quality and fast 64 bit hash code by Yann Colet and Mathias Westerdahl. The
+ * class below is modelled like its Murmur3_x86_32 cousin.
+ * <p/>
+ * This was largely based on the following (original) C and Java implementations:
+ * https://github.com/Cyan4973/xxHash/blob/master/xxhash.c
+ * https://github.com/OpenHFT/Zero-Allocation-Hashing/blob/master/src/main/java/net/openhft/hashing/XxHash_r39.java
+ * https://github.com/airlift/slice/blob/master/src/main/java/io/airlift/slice/XxHash64.java
+ */
+// scalastyle: on
+public final class XXH64 {
+
+ private static final long PRIME64_1 = 0x9E3779B185EBCA87L;
+ private static final long PRIME64_2 = 0xC2B2AE3D27D4EB4FL;
+ private static final long PRIME64_3 = 0x165667B19E3779F9L;
+ private static final long PRIME64_4 = 0x85EBCA77C2B2AE63L;
+ private static final long PRIME64_5 = 0x27D4EB2F165667C5L;
+
+ private final long seed;
+
+ public XXH64(long seed) {
+ super();
+ this.seed = seed;
+ }
+
+ @Override
+ public String toString() {
+ return "xxHash64(seed=" + seed + ")";
+ }
+
+ public long hashInt(int input) {
+ return hashInt(input, seed);
+ }
+
+ public static long hashInt(int input, long seed) {
+ long hash = seed + PRIME64_5 + 4L;
+ hash ^= (input & 0xFFFFFFFFL) * PRIME64_1;
+ hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
+ return fmix(hash);
+ }
+
+ public long hashLong(long input) {
+ return hashLong(input, seed);
+ }
+
+ public static long hashLong(long input, long seed) {
+ long hash = seed + PRIME64_5 + 8L;
+ hash ^= Long.rotateLeft(input * PRIME64_2, 31) * PRIME64_1;
+ hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
+ return fmix(hash);
+ }
+
+ public long hashUnsafeWords(Object base, long offset, int length) {
+ return hashUnsafeWords(base, offset, length, seed);
+ }
+
+ public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
+ assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
+ long hash = hashBytesByWords(base, offset, length, seed);
+ return fmix(hash);
+ }
+
+ public long hashUnsafeBytes(Object base, long offset, int length) {
+ return hashUnsafeBytes(base, offset, length, seed);
+ }
+
+ public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
+ assert (length >= 0) : "lengthInBytes cannot be negative";
+ long hash = hashBytesByWords(base, offset, length, seed);
+ long end = offset + length;
+ offset += length & -8;
+
+ if (offset + 4L <= end) {
+ hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
+ hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
+ offset += 4L;
+ }
+
+ while (offset < end) {
+ hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
+ hash = Long.rotateLeft(hash, 11) * PRIME64_1;
+ offset++;
+ }
+ return fmix(hash);
+ }
+
+ private static long fmix(long hash) {
+ hash ^= hash >>> 33;
+ hash *= PRIME64_2;
+ hash ^= hash >>> 29;
+ hash *= PRIME64_3;
+ hash ^= hash >>> 32;
+ return hash;
+ }
+
+ private static long hashBytesByWords(Object base, long offset, int length, long seed) {
+ long end = offset + length;
+ long hash;
+ if (length >= 32) {
+ long limit = end - 32;
+ long v1 = seed + PRIME64_1 + PRIME64_2;
+ long v2 = seed + PRIME64_2;
+ long v3 = seed;
+ long v4 = seed - PRIME64_1;
+
+ do {
+ v1 += Platform.getLong(base, offset) * PRIME64_2;
+ v1 = Long.rotateLeft(v1, 31);
+ v1 *= PRIME64_1;
+
+ v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
+ v2 = Long.rotateLeft(v2, 31);
+ v2 *= PRIME64_1;
+
+ v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
+ v3 = Long.rotateLeft(v3, 31);
+ v3 *= PRIME64_1;
+
+ v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
+ v4 = Long.rotateLeft(v4, 31);
+ v4 *= PRIME64_1;
+
+ offset += 32L;
+ } while (offset <= limit);
+
+ hash = Long.rotateLeft(v1, 1)
+ + Long.rotateLeft(v2, 7)
+ + Long.rotateLeft(v3, 12)
+ + Long.rotateLeft(v4, 18);
+
+ v1 *= PRIME64_2;
+ v1 = Long.rotateLeft(v1, 31);
+ v1 *= PRIME64_1;
+ hash ^= v1;
+ hash = hash * PRIME64_1 + PRIME64_4;
+
+ v2 *= PRIME64_2;
+ v2 = Long.rotateLeft(v2, 31);
+ v2 *= PRIME64_1;
+ hash ^= v2;
+ hash = hash * PRIME64_1 + PRIME64_4;
+
+ v3 *= PRIME64_2;
+ v3 = Long.rotateLeft(v3, 31);
+ v3 *= PRIME64_1;
+ hash ^= v3;
+ hash = hash * PRIME64_1 + PRIME64_4;
+
+ v4 *= PRIME64_2;
+ v4 = Long.rotateLeft(v4, 31);
+ v4 *= PRIME64_1;
+ hash ^= v4;
+ hash = hash * PRIME64_1 + PRIME64_4;
+ } else {
+ hash = seed + PRIME64_5;
+ }
+
+ hash += length;
+
+ long limit = end - 8;
+ while (offset <= limit) {
+ long k1 = Platform.getLong(base, offset);
+ hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
+ hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
+ offset += 8L;
+ }
+ return hash;
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
index 32bae13360..b6bd56cff6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
@@ -169,7 +169,7 @@ case class HyperLogLogPlusPlus(
val v = child.eval(input)
if (v != null) {
// Create the hashed value 'x'.
- val x = MurmurHash.hash64(v)
+ val x = XxHash64Function.hash(v, child.dataType, 42L)
// Determine the index of the register we are going to use.
val idx = (x >>> idxShift).toInt
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 8f260ad151..e8a3e129b4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -185,6 +185,7 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp
}
}
+
/**
* A function that calculates hash value for a group of expressions. Note that the `seed` argument
* is not exposed to users and should only be set inside spark SQL.
@@ -213,14 +214,10 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp
* `result`.
*
* Finally we aggregate the hash values for each expression by the same way of struct.
- *
- * We should use this hash function for both shuffle and bucket, so that we can guarantee shuffle
- * and bucketing have same data distribution.
*/
-case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression {
- def this(arguments: Seq[Expression]) = this(arguments, 42)
-
- override def dataType: DataType = IntegerType
+abstract class HashExpression[E] extends Expression {
+ /** Seed of the HashExpression. */
+ val seed: E
override def foldable: Boolean = children.forall(_.foldable)
@@ -234,8 +231,6 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
}
}
- override def prettyName: String = "hash"
-
override def eval(input: InternalRow): Any = {
var hash = seed
var i = 0
@@ -247,80 +242,7 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
hash
}
- private def computeHash(value: Any, dataType: DataType, seed: Int): Int = {
- def hashInt(i: Int): Int = Murmur3_x86_32.hashInt(i, seed)
- def hashLong(l: Long): Int = Murmur3_x86_32.hashLong(l, seed)
-
- value match {
- case null => seed
- case b: Boolean => hashInt(if (b) 1 else 0)
- case b: Byte => hashInt(b)
- case s: Short => hashInt(s)
- case i: Int => hashInt(i)
- case l: Long => hashLong(l)
- case f: Float => hashInt(java.lang.Float.floatToIntBits(f))
- case d: Double => hashLong(java.lang.Double.doubleToLongBits(d))
- case d: Decimal =>
- val precision = dataType.asInstanceOf[DecimalType].precision
- if (precision <= Decimal.MAX_LONG_DIGITS) {
- hashLong(d.toUnscaledLong)
- } else {
- val bytes = d.toJavaBigDecimal.unscaledValue().toByteArray
- Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, seed)
- }
- case c: CalendarInterval => Murmur3_x86_32.hashInt(c.months, hashLong(c.microseconds))
- case a: Array[Byte] =>
- Murmur3_x86_32.hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
- case s: UTF8String =>
- Murmur3_x86_32.hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
-
- case array: ArrayData =>
- val elementType = dataType match {
- case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
- case ArrayType(et, _) => et
- }
- var result = seed
- var i = 0
- while (i < array.numElements()) {
- result = computeHash(array.get(i, elementType), elementType, result)
- i += 1
- }
- result
-
- case map: MapData =>
- val (kt, vt) = dataType match {
- case udt: UserDefinedType[_] =>
- val mapType = udt.sqlType.asInstanceOf[MapType]
- mapType.keyType -> mapType.valueType
- case MapType(kt, vt, _) => kt -> vt
- }
- val keys = map.keyArray()
- val values = map.valueArray()
- var result = seed
- var i = 0
- while (i < map.numElements()) {
- result = computeHash(keys.get(i, kt), kt, result)
- result = computeHash(values.get(i, vt), vt, result)
- i += 1
- }
- result
-
- case struct: InternalRow =>
- val types: Array[DataType] = dataType match {
- case udt: UserDefinedType[_] =>
- udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray
- case StructType(fields) => fields.map(_.dataType)
- }
- var result = seed
- var i = 0
- val len = struct.numFields
- while (i < len) {
- result = computeHash(struct.get(i, types(i)), types(i), result)
- i += 1
- }
- result
- }
- }
+ protected def computeHash(value: Any, dataType: DataType, seed: E): E
override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
ev.isNull = "false"
@@ -332,7 +254,7 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
}.mkString("\n")
s"""
- int ${ev.value} = $seed;
+ ${ctx.javaType(dataType)} ${ev.value} = $seed;
$childrenHash
"""
}
@@ -360,7 +282,7 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
dataType: DataType,
result: String,
ctx: CodegenContext): String = {
- val hasher = classOf[Murmur3_x86_32].getName
+ val hasher = hasherClassName
def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);"
def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);"
@@ -423,6 +345,125 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx)
}
}
+
+ protected def hasherClassName: String
+}
+
+/**
+ * Base class for interpreted hash functions.
+ */
+abstract class InterpretedHashFunction {
+ protected def hashInt(i: Int, seed: Long): Long
+
+ protected def hashLong(l: Long, seed: Long): Long
+
+ protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
+
+ def hash(value: Any, dataType: DataType, seed: Long): Long = {
+ value match {
+ case null => seed
+ case b: Boolean => hashInt(if (b) 1 else 0, seed)
+ case b: Byte => hashInt(b, seed)
+ case s: Short => hashInt(s, seed)
+ case i: Int => hashInt(i, seed)
+ case l: Long => hashLong(l, seed)
+ case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
+ case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
+ case d: Decimal =>
+ val precision = dataType.asInstanceOf[DecimalType].precision
+ if (precision <= Decimal.MAX_LONG_DIGITS) {
+ hashLong(d.toUnscaledLong, seed)
+ } else {
+ val bytes = d.toJavaBigDecimal.unscaledValue().toByteArray
+ hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, seed)
+ }
+ case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
+ case a: Array[Byte] =>
+ hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
+ case s: UTF8String =>
+ hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
+
+ case array: ArrayData =>
+ val elementType = dataType match {
+ case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
+ case ArrayType(et, _) => et
+ }
+ var result = seed
+ var i = 0
+ while (i < array.numElements()) {
+ result = hash(array.get(i, elementType), elementType, result)
+ i += 1
+ }
+ result
+
+ case map: MapData =>
+ val (kt, vt) = dataType match {
+ case udt: UserDefinedType[_] =>
+ val mapType = udt.sqlType.asInstanceOf[MapType]
+ mapType.keyType -> mapType.valueType
+ case MapType(kt, vt, _) => kt -> vt
+ }
+ val keys = map.keyArray()
+ val values = map.valueArray()
+ var result = seed
+ var i = 0
+ while (i < map.numElements()) {
+ result = hash(keys.get(i, kt), kt, result)
+ result = hash(values.get(i, vt), vt, result)
+ i += 1
+ }
+ result
+
+ case struct: InternalRow =>
+ val types: Array[DataType] = dataType match {
+ case udt: UserDefinedType[_] =>
+ udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray
+ case StructType(fields) => fields.map(_.dataType)
+ }
+ var result = seed
+ var i = 0
+ val len = struct.numFields
+ while (i < len) {
+ result = hash(struct.get(i, types(i)), types(i), result)
+ i += 1
+ }
+ result
+ }
+ }
+}
+
+/**
+ * A MurMur3 Hash expression.
+ *
+ * We should use this hash function for both shuffle and bucket, so that we can guarantee shuffle
+ * and bucketing have same data distribution.
+ */
+case class Murmur3Hash(children: Seq[Expression], seed: Int) extends HashExpression[Int] {
+ def this(arguments: Seq[Expression]) = this(arguments, 42)
+
+ override def dataType: DataType = IntegerType
+
+ override def prettyName: String = "hash"
+
+ override protected def hasherClassName: String = classOf[Murmur3_x86_32].getName
+
+ override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = {
+ Murmur3HashFunction.hash(value, dataType, seed).toInt
+ }
+}
+
+object Murmur3HashFunction extends InterpretedHashFunction {
+ override protected def hashInt(i: Int, seed: Long): Long = {
+ Murmur3_x86_32.hashInt(i, seed.toInt)
+ }
+
+ override protected def hashLong(l: Long, seed: Long): Long = {
+ Murmur3_x86_32.hashLong(l, seed.toInt)
+ }
+
+ override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt)
+ }
}
/**
@@ -442,3 +483,30 @@ case class PrintToStderr(child: Expression) extends UnaryExpression {
""".stripMargin)
}
}
+
+/**
+ * A xxHash64 64-bit hash expression.
+ */
+case class XxHash64(children: Seq[Expression], seed: Long) extends HashExpression[Long] {
+ def this(arguments: Seq[Expression]) = this(arguments, 42L)
+
+ override def dataType: DataType = LongType
+
+ override def prettyName: String = "xxHash"
+
+ override protected def hasherClassName: String = classOf[XXH64].getName
+
+ override protected def computeHash(value: Any, dataType: DataType, seed: Long): Long = {
+ XxHash64Function.hash(value, dataType, seed)
+ }
+}
+
+object XxHash64Function extends InterpretedHashFunction {
+ override protected def hashInt(i: Int, seed: Long): Long = XXH64.hashInt(i, seed)
+
+ override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed)
+
+ override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ XXH64.hashUnsafeBytes(base, offset, len, seed)
+ }
+}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
new file mode 100644
index 0000000000..711887f028
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.spark.unsafe.Platform;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test the XXH64 function.
+ * <p/>
+ * Test constants were taken from the original implementation and the airlift/slice implementation.
+ */
+public class XXH64Suite {
+
+ private static final XXH64 hasher = new XXH64(0);
+
+ private static final int SIZE = 101;
+ private static final long PRIME = 2654435761L;
+ private static final byte[] BUFFER = new byte[SIZE];
+ private static final int TEST_INT = 0x4B1FFF9E; // First 4 bytes in the buffer
+ private static final long TEST_LONG = 0xDD2F535E4B1FFF9EL; // First 8 bytes in the buffer
+
+ /* Create the test data. */
+ static {
+ long seed = PRIME;
+ for (int i = 0; i < SIZE; i++) {
+ BUFFER[i] = (byte) (seed >> 24);
+ seed *= seed;
+ }
+ }
+
+ @Test
+ public void testKnownIntegerInputs() {
+ Assert.assertEquals(0x9256E58AA397AEF1L, hasher.hashInt(TEST_INT));
+ Assert.assertEquals(0x9D5FFDFB928AB4BL, XXH64.hashInt(TEST_INT, PRIME));
+ }
+
+ @Test
+ public void testKnownLongInputs() {
+ Assert.assertEquals(0xF74CB1451B32B8CFL, hasher.hashLong(TEST_LONG));
+ Assert.assertEquals(0x9C44B77FBCC302C5L, XXH64.hashLong(TEST_LONG, PRIME));
+ }
+
+ @Test
+ public void testKnownByteArrayInputs() {
+ Assert.assertEquals(0xEF46DB3751D8E999L,
+ hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 0));
+ Assert.assertEquals(0xAC75FDA2929B17EFL,
+ XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 0, PRIME));
+ Assert.assertEquals(0x4FCE394CC88952D8L,
+ hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 1));
+ Assert.assertEquals(0x739840CB819FA723L,
+ XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 1, PRIME));
+
+ // These tests currently fail in a big endian environment because the test data and expected
+ // answers are generated with little endian the assumptions. We could revisit this when Platform
+ // becomes endian aware.
+ if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
+ Assert.assertEquals(0x9256E58AA397AEF1L,
+ hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 4));
+ Assert.assertEquals(0x9D5FFDFB928AB4BL,
+ XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 4, PRIME));
+ Assert.assertEquals(0xF74CB1451B32B8CFL,
+ hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 8));
+ Assert.assertEquals(0x9C44B77FBCC302C5L,
+ XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 8, PRIME));
+ Assert.assertEquals(0xCFFA8DB881BC3A3DL,
+ hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 14));
+ Assert.assertEquals(0x5B9611585EFCC9CBL,
+ XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 14, PRIME));
+ Assert.assertEquals(0x0EAB543384F878ADL,
+ hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, SIZE));
+ Assert.assertEquals(0xCAA65939306F1E21L,
+ XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, SIZE, PRIME));
+ }
+ }
+
+ @Test
+ public void randomizedStressTest() {
+ int size = 65536;
+ Random rand = new Random();
+
+ // A set used to track collision rate.
+ Set<Long> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int vint = rand.nextInt();
+ long lint = rand.nextLong();
+ Assert.assertEquals(hasher.hashInt(vint), hasher.hashInt(vint));
+ Assert.assertEquals(hasher.hashLong(lint), hasher.hashLong(lint));
+
+ hashcodes.add(hasher.hashLong(lint));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95d);
+ }
+
+ @Test
+ public void randomizedStressTestBytes() {
+ int size = 65536;
+ Random rand = new Random();
+
+ // A set used to track collision rate.
+ Set<Long> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int byteArrSize = rand.nextInt(100) * 8;
+ byte[] bytes = new byte[byteArrSize];
+ rand.nextBytes(bytes);
+
+ Assert.assertEquals(
+ hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+ hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+
+ hashcodes.add(hasher.hashUnsafeWords(
+ bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95d);
+ }
+
+ @Test
+ public void randomizedStressTestPaddedStrings() {
+ int size = 64000;
+ // A set used to track collision rate.
+ Set<Long> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int byteArrSize = 8;
+ byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
+ byte[] paddedBytes = new byte[byteArrSize];
+ System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
+
+ Assert.assertEquals(
+ hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+ hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+
+ hashcodes.add(hasher.hashUnsafeWords(
+ paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95d);
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
index 5a929f211a..c6a1a2be0d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
@@ -18,14 +18,14 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Murmur3Hash, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
import org.apache.spark.sql.types._
import org.apache.spark.util.Benchmark
/**
- * Benchmark for the previous interpreted hash function(InternalRow.hashCode) vs the new codegen
- * hash expression(Murmur3Hash).
+ * Benchmark for the previous interpreted hash function(InternalRow.hashCode) vs codegened
+ * hash expressions (Murmur3Hash/xxHash64).
*/
object HashBenchmark {
@@ -63,19 +63,44 @@ object HashBenchmark {
}
}
}
+
+ val getHashCode64b = UnsafeProjection.create(new XxHash64(attrs) :: Nil, attrs)
+ benchmark.addCase("codegen version 64-bit") { _: Int =>
+ for (_ <- 0L until iters) {
+ var sum = 0
+ var i = 0
+ while (i < numRows) {
+ sum += getHashCode64b(rows(i)).getInt(0)
+ i += 1
+ }
+ }
+ }
+
benchmark.run()
}
def main(args: Array[String]): Unit = {
- val simple = new StructType().add("i", IntegerType)
+ val singleInt = new StructType().add("i", IntegerType)
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ interpreted version 1006 / 1011 133.4 7.5 1.0X
+ codegen version 1835 / 1839 73.1 13.7 0.5X
+ codegen version 64-bit 1627 / 1628 82.5 12.1 0.6X
+ */
+ test("single ints", singleInt, 1 << 15, 1 << 14)
+
+ val singleLong = new StructType().add("i", LongType)
/*
- Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
- Hash For simple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
- interpreted version 941 / 955 142.6 7.0 1.0X
- codegen version 1737 / 1775 77.3 12.9 0.5X
+ interpreted version 1196 / 1209 112.2 8.9 1.0X
+ codegen version 2178 / 2181 61.6 16.2 0.5X
+ codegen version 64-bit 1752 / 1753 76.6 13.1 0.7X
*/
- test("simple", simple, 1 << 13, 1 << 14)
+ test("single longs", singleLong, 1 << 15, 1 << 14)
val normal = new StructType()
.add("null", NullType)
@@ -93,11 +118,12 @@ object HashBenchmark {
.add("date", DateType)
.add("timestamp", TimestampType)
/*
- Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
- interpreted version 2209 / 2271 0.9 1053.4 1.0X
- codegen version 1887 / 2018 1.1 899.9 1.2X
+ interpreted version 2713 / 2715 0.8 1293.5 1.0X
+ codegen version 2015 / 2018 1.0 960.9 1.3X
+ codegen version 64-bit 735 / 738 2.9 350.7 3.7X
*/
test("normal", normal, 1 << 10, 1 << 11)
@@ -106,11 +132,12 @@ object HashBenchmark {
.add("array", arrayOfInt)
.add("arrayOfArray", ArrayType(arrayOfInt))
/*
- Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
- interpreted version 1481 / 1529 0.1 11301.7 1.0X
- codegen version 2591 / 2636 0.1 19771.1 0.6X
+ interpreted version 1498 / 1499 0.1 11432.1 1.0X
+ codegen version 2642 / 2643 0.0 20158.4 0.6X
+ codegen version 64-bit 2421 / 2424 0.1 18472.5 0.6X
*/
test("array", array, 1 << 8, 1 << 9)
@@ -119,11 +146,12 @@ object HashBenchmark {
.add("map", mapOfInt)
.add("mapOfMap", MapType(IntegerType, mapOfInt))
/*
- Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
- interpreted version 1820 / 1861 0.0 444347.2 1.0X
- codegen version 205 / 223 0.0 49936.5 8.9X
+ interpreted version 1612 / 1618 0.0 393553.4 1.0X
+ codegen version 149 / 150 0.0 36381.2 10.8X
+ codegen version 64-bit 144 / 145 0.0 35122.1 11.2X
*/
test("map", map, 1 << 6, 1 << 6)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
new file mode 100644
index 0000000000..53f21a8442
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Random
+
+import org.apache.spark.sql.catalyst.expressions.XXH64
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.hash.Murmur3_x86_32
+import org.apache.spark.util.Benchmark
+
+/**
+ * Synthetic benchmark for MurMurHash 3 and xxHash64.
+ */
+object HashByteArrayBenchmark {
+ def test(length: Int, seed: Long, numArrays: Int, iters: Int): Unit = {
+ val random = new Random(seed)
+ val arrays = Array.fill[Array[Byte]](numArrays) {
+ val bytes = new Array[Byte](length)
+ random.nextBytes(bytes)
+ bytes
+ }
+
+ val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays)
+ benchmark.addCase("Murmur3_x86_32") { _: Int =>
+ for (_ <- 0L until iters) {
+ var sum = 0
+ var i = 0
+ while (i < numArrays) {
+ sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
+ i += 1
+ }
+ }
+ }
+
+ benchmark.addCase("xxHash 64-bit") { _: Int =>
+ for (_ <- 0L until iters) {
+ var sum = 0L
+ var i = 0
+ while (i < numArrays) {
+ sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
+ i += 1
+ }
+ }
+ }
+
+ benchmark.run()
+ }
+
+ def main(args: Array[String]): Unit = {
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 11 / 12 185.1 5.4 1.0X
+ xxHash 64-bit 17 / 18 120.0 8.3 0.6X
+ */
+ test(8, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 18 / 18 118.6 8.4 1.0X
+ xxHash 64-bit 20 / 21 102.5 9.8 0.9X
+ */
+ test(16, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 24 / 24 86.6 11.5 1.0X
+ xxHash 64-bit 23 / 23 93.2 10.7 1.1X
+ */
+ test(24, 42L, 1 << 10, 1 << 11)
+
+ // Add 31 to all arrays to create worse case alignment for xxHash.
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 38 / 39 54.7 18.3 1.0X
+ xxHash 64-bit 33 / 33 64.4 15.5 1.2X
+ */
+ test(31, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 91 / 94 22.9 43.6 1.0X
+ xxHash 64-bit 68 / 69 30.6 32.7 1.3X
+ */
+ test(64 + 31, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 268 / 268 7.8 127.6 1.0X
+ xxHash 64-bit 108 / 109 19.4 51.6 2.5X
+ */
+ test(256 + 31, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 942 / 945 2.2 449.4 1.0X
+ xxHash 64-bit 276 / 276 7.6 131.4 3.4X
+ */
+ test(1024 + 31, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 1839 / 1843 1.1 876.8 1.0X
+ xxHash 64-bit 445 / 448 4.7 212.1 4.1X
+ */
+ test(2048 + 31, 42L, 1 << 10, 1 << 11)
+
+ /*
+ Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
+ Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ Murmur3_x86_32 7307 / 7310 0.3 3484.4 1.0X
+ xxHash 64-bit 1487 / 1488 1.4 709.1 4.9X
+ */
+ test(8192 + 31, 42L, 1 << 10, 1 << 11)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
index 60d50baf51..f5bafcc6a7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
@@ -76,7 +76,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
private val mapOfString = MapType(StringType, StringType)
private val arrayOfUDT = ArrayType(new ExamplePointUDT, false)
- testMurmur3Hash(
+ testHash(
new StructType()
.add("null", NullType)
.add("boolean", BooleanType)
@@ -94,7 +94,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("timestamp", TimestampType)
.add("udt", new ExamplePointUDT))
- testMurmur3Hash(
+ testHash(
new StructType()
.add("arrayOfNull", arrayOfNull)
.add("arrayOfString", arrayOfString)
@@ -104,7 +104,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("arrayOfStruct", ArrayType(structOfString))
.add("arrayOfUDT", arrayOfUDT))
- testMurmur3Hash(
+ testHash(
new StructType()
.add("mapOfIntAndString", MapType(IntegerType, StringType))
.add("mapOfStringAndArray", MapType(StringType, arrayOfString))
@@ -114,7 +114,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("mapOfStructAndString", MapType(structOfString, StringType))
.add("mapOfStruct", MapType(structOfString, structOfString)))
- testMurmur3Hash(
+ testHash(
new StructType()
.add("structOfString", structOfString)
.add("structOfStructOfString", new StructType().add("struct", structOfString))
@@ -124,11 +124,11 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
new StructType().add("array", arrayOfString).add("map", mapOfString))
.add("structOfUDT", structOfUDT))
- private def testMurmur3Hash(inputSchema: StructType): Unit = {
+ private def testHash(inputSchema: StructType): Unit = {
val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get
val encoder = RowEncoder(inputSchema)
val seed = scala.util.Random.nextInt()
- test(s"murmur3 hash: ${inputSchema.simpleString}") {
+ test(s"murmur3/xxHash64 hash: ${inputSchema.simpleString}") {
for (_ <- 1 to 10) {
val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow]
val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map {
@@ -136,6 +136,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
// Only test the interpreted version has same result with codegen version.
checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval())
+ checkEvaluation(XxHash64(literals, seed), XxHash64(literals, seed).eval())
}
}
}