aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala391
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java128
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala93
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala118
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala3
6 files changed, 631 insertions, 151 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
new file mode 100644
index 0000000000..c7ea9085eb
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * Simulates Hive's hashing function at
+ * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode()
+ */
+public class HiveHasher {
+
+ @Override
+ public String toString() {
+ return HiveHasher.class.getSimpleName();
+ }
+
+ public static int hashInt(int input) {
+ return input;
+ }
+
+ public static int hashLong(long input) {
+ return (int) ((input >>> 32) ^ input);
+ }
+
+ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
+ assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
+ int result = 0;
+ for (int i = 0; i < lengthInBytes; i++) {
+ result = (result * 31) + (int) Platform.getByte(base, offset + i);
+ }
+ return result;
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index dbb52a4bb1..138ef2a1dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -259,7 +259,7 @@ abstract class HashExpression[E] extends Expression {
$childrenHash""")
}
- private def nullSafeElementHash(
+ protected def nullSafeElementHash(
input: String,
index: String,
nullable: Boolean,
@@ -276,76 +276,127 @@ abstract class HashExpression[E] extends Expression {
}
}
- @tailrec
- private def computeHash(
+ protected def genHashInt(i: String, result: String): String =
+ s"$result = $hasherClassName.hashInt($i, $result);"
+
+ protected def genHashLong(l: String, result: String): String =
+ s"$result = $hasherClassName.hashLong($l, $result);"
+
+ protected def genHashBytes(b: String, result: String): String = {
+ val offset = "Platform.BYTE_ARRAY_OFFSET"
+ s"$result = $hasherClassName.hashUnsafeBytes($b, $offset, $b.length, $result);"
+ }
+
+ protected def genHashBoolean(input: String, result: String): String =
+ genHashInt(s"$input ? 1 : 0", result)
+
+ protected def genHashFloat(input: String, result: String): String =
+ genHashInt(s"Float.floatToIntBits($input)", result)
+
+ protected def genHashDouble(input: String, result: String): String =
+ genHashLong(s"Double.doubleToLongBits($input)", result)
+
+ protected def genHashDecimal(
+ ctx: CodegenContext,
+ d: DecimalType,
input: String,
- dataType: DataType,
- result: String,
- ctx: CodegenContext): String = {
- val hasher = hasherClassName
-
- def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);"
- def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);"
- def hashBytes(b: String): String =
- s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, $result);"
-
- dataType match {
- case NullType => ""
- case BooleanType => hashInt(s"$input ? 1 : 0")
- case ByteType | ShortType | IntegerType | DateType => hashInt(input)
- case LongType | TimestampType => hashLong(input)
- case FloatType => hashInt(s"Float.floatToIntBits($input)")
- case DoubleType => hashLong(s"Double.doubleToLongBits($input)")
- case d: DecimalType =>
- if (d.precision <= Decimal.MAX_LONG_DIGITS) {
- hashLong(s"$input.toUnscaledLong()")
- } else {
- val bytes = ctx.freshName("bytes")
- s"""
+ result: String): String = {
+ if (d.precision <= Decimal.MAX_LONG_DIGITS) {
+ genHashLong(s"$input.toUnscaledLong()", result)
+ } else {
+ val bytes = ctx.freshName("bytes")
+ s"""
final byte[] $bytes = $input.toJavaBigDecimal().unscaledValue().toByteArray();
- ${hashBytes(bytes)}
+ ${genHashBytes(bytes, result)}
"""
+ }
+ }
+
+ protected def genHashCalendarInterval(input: String, result: String): String = {
+ val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
+ s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
+ }
+
+ protected def genHashString(input: String, result: String): String = {
+ val baseObject = s"$input.getBaseObject()"
+ val baseOffset = s"$input.getBaseOffset()"
+ val numBytes = s"$input.numBytes()"
+ s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
+ }
+
+ protected def genHashForMap(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ val keys = ctx.freshName("keys")
+ val values = ctx.freshName("values")
+ s"""
+ final ArrayData $keys = $input.keyArray();
+ final ArrayData $values = $input.valueArray();
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ ${nullSafeElementHash(keys, index, false, keyType, result, ctx)}
+ ${nullSafeElementHash(values, index, valueContainsNull, valueType, result, ctx)}
}
- case CalendarIntervalType =>
- val microsecondsHash = s"$hasher.hashLong($input.microseconds, $result)"
- s"$result = $hasher.hashInt($input.months, $microsecondsHash);"
- case BinaryType => hashBytes(input)
- case StringType =>
- val baseObject = s"$input.getBaseObject()"
- val baseOffset = s"$input.getBaseOffset()"
- val numBytes = s"$input.numBytes()"
- s"$result = $hasher.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
-
- case ArrayType(et, containsNull) =>
- val index = ctx.freshName("index")
- s"""
- for (int $index = 0; $index < $input.numElements(); $index++) {
- ${nullSafeElementHash(input, index, containsNull, et, result, ctx)}
- }
- """
-
- case MapType(kt, vt, valueContainsNull) =>
- val index = ctx.freshName("index")
- val keys = ctx.freshName("keys")
- val values = ctx.freshName("values")
- s"""
- final ArrayData $keys = $input.keyArray();
- final ArrayData $values = $input.valueArray();
- for (int $index = 0; $index < $input.numElements(); $index++) {
- ${nullSafeElementHash(keys, index, false, kt, result, ctx)}
- ${nullSafeElementHash(values, index, valueContainsNull, vt, result, ctx)}
- }
- """
+ """
+ }
+
+ protected def genHashForArray(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ elementType: DataType,
+ containsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ s"""
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ ${nullSafeElementHash(input, index, containsNull, elementType, result, ctx)}
+ }
+ """
+ }
- case StructType(fields) =>
- fields.zipWithIndex.map { case (field, index) =>
- nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx)
- }.mkString("\n")
+ protected def genHashForStruct(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ fields: Array[StructField]): String = {
+ fields.zipWithIndex.map { case (field, index) =>
+ nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx)
+ }.mkString("\n")
+ }
- case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx)
- }
+ @tailrec
+ private def computeHashWithTailRec(
+ input: String,
+ dataType: DataType,
+ result: String,
+ ctx: CodegenContext): String = dataType match {
+ case NullType => ""
+ case BooleanType => genHashBoolean(input, result)
+ case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
+ case LongType | TimestampType => genHashLong(input, result)
+ case FloatType => genHashFloat(input, result)
+ case DoubleType => genHashDouble(input, result)
+ case d: DecimalType => genHashDecimal(ctx, d, input, result)
+ case CalendarIntervalType => genHashCalendarInterval(input, result)
+ case BinaryType => genHashBytes(input, result)
+ case StringType => genHashString(input, result)
+ case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull)
+ case MapType(kt, vt, valueContainsNull) =>
+ genHashForMap(ctx, input, result, kt, vt, valueContainsNull)
+ case StructType(fields) => genHashForStruct(ctx, input, result, fields)
+ case udt: UserDefinedType[_] => computeHashWithTailRec(input, udt.sqlType, result, ctx)
}
+ protected def computeHash(
+ input: String,
+ dataType: DataType,
+ result: String,
+ ctx: CodegenContext): String = computeHashWithTailRec(input, dataType, result, ctx)
+
protected def hasherClassName: String
}
@@ -568,3 +619,217 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable {
override def foldable: Boolean = true
override def nullable: Boolean = false
}
+
+/**
+ * Simulates Hive's hashing function at
+ * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive
+ *
+ * We should use this hash function for both shuffle and bucket of Hive tables, so that
+ * we can guarantee shuffle and bucketing have same data distribution
+ *
+ * TODO: Support Decimal and date related types
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.")
+case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
+ override val seed = 0
+
+ override def dataType: DataType = IntegerType
+
+ override def prettyName: String = "hive-hash"
+
+ override protected def hasherClassName: String = classOf[HiveHasher].getName
+
+ override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = {
+ HiveHashFunction.hash(value, dataType, seed).toInt
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ ev.isNull = "false"
+ val childHash = ctx.freshName("childHash")
+ val childrenHash = children.map { child =>
+ val childGen = child.genCode(ctx)
+ childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
+ computeHash(childGen.value, child.dataType, childHash, ctx)
+ } + s"${ev.value} = (31 * ${ev.value}) + $childHash;"
+ }.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "")
+
+ ev.copy(code = s"""
+ ${ctx.javaType(dataType)} ${ev.value} = $seed;
+ $childrenHash""")
+ }
+
+ override def eval(input: InternalRow): Int = {
+ var hash = seed
+ var i = 0
+ val len = children.length
+ while (i < len) {
+ hash = (31 * hash) + computeHash(children(i).eval(input), children(i).dataType, hash)
+ i += 1
+ }
+ hash
+ }
+
+ override protected def genHashInt(i: String, result: String): String =
+ s"$result = $hasherClassName.hashInt($i);"
+
+ override protected def genHashLong(l: String, result: String): String =
+ s"$result = $hasherClassName.hashLong($l);"
+
+ override protected def genHashBytes(b: String, result: String): String =
+ s"$result = $hasherClassName.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length);"
+
+ override protected def genHashCalendarInterval(input: String, result: String): String = {
+ s"""
+ $result = (31 * $hasherClassName.hashInt($input.months)) +
+ $hasherClassName.hashLong($input.microseconds);"
+ """
+ }
+
+ override protected def genHashString(input: String, result: String): String = {
+ val baseObject = s"$input.getBaseObject()"
+ val baseOffset = s"$input.getBaseOffset()"
+ val numBytes = s"$input.numBytes()"
+ s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);"
+ }
+
+ override protected def genHashForArray(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ elementType: DataType,
+ containsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ val childResult = ctx.freshName("childResult")
+ s"""
+ int $childResult = 0;
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ $childResult = 0;
+ ${nullSafeElementHash(input, index, containsNull, elementType, childResult, ctx)};
+ $result = (31 * $result) + $childResult;
+ }
+ """
+ }
+
+ override protected def genHashForMap(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ val keys = ctx.freshName("keys")
+ val values = ctx.freshName("values")
+ val keyResult = ctx.freshName("keyResult")
+ val valueResult = ctx.freshName("valueResult")
+ s"""
+ final ArrayData $keys = $input.keyArray();
+ final ArrayData $values = $input.valueArray();
+ int $keyResult = 0;
+ int $valueResult = 0;
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ $keyResult = 0;
+ ${nullSafeElementHash(keys, index, false, keyType, keyResult, ctx)}
+ $valueResult = 0;
+ ${nullSafeElementHash(values, index, valueContainsNull, valueType, valueResult, ctx)}
+ $result += $keyResult ^ $valueResult;
+ }
+ """
+ }
+
+ override protected def genHashForStruct(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ fields: Array[StructField]): String = {
+ val localResult = ctx.freshName("localResult")
+ val childResult = ctx.freshName("childResult")
+ fields.zipWithIndex.map { case (field, index) =>
+ s"""
+ $childResult = 0;
+ ${nullSafeElementHash(input, index.toString, field.nullable, field.dataType,
+ childResult, ctx)}
+ $localResult = (31 * $localResult) + $childResult;
+ """
+ }.mkString(
+ s"""
+ int $localResult = 0;
+ int $childResult = 0;
+ """,
+ "",
+ s"$result = (31 * $result) + $localResult;"
+ )
+ }
+}
+
+object HiveHashFunction extends InterpretedHashFunction {
+ override protected def hashInt(i: Int, seed: Long): Long = {
+ HiveHasher.hashInt(i)
+ }
+
+ override protected def hashLong(l: Long, seed: Long): Long = {
+ HiveHasher.hashLong(l)
+ }
+
+ override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ HiveHasher.hashUnsafeBytes(base, offset, len)
+ }
+
+ override def hash(value: Any, dataType: DataType, seed: Long): Long = {
+ value match {
+ case null => 0
+ case array: ArrayData =>
+ val elementType = dataType match {
+ case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
+ case ArrayType(et, _) => et
+ }
+
+ var result = 0
+ var i = 0
+ val length = array.numElements()
+ while (i < length) {
+ result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt
+ i += 1
+ }
+ result
+
+ case map: MapData =>
+ val (kt, vt) = dataType match {
+ case udt: UserDefinedType[_] =>
+ val mapType = udt.sqlType.asInstanceOf[MapType]
+ mapType.keyType -> mapType.valueType
+ case MapType(_kt, _vt, _) => _kt -> _vt
+ }
+ val keys = map.keyArray()
+ val values = map.valueArray()
+
+ var result = 0
+ var i = 0
+ val length = map.numElements()
+ while (i < length) {
+ result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt
+ i += 1
+ }
+ result
+
+ case struct: InternalRow =>
+ val types: Array[DataType] = dataType match {
+ case udt: UserDefinedType[_] =>
+ udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray
+ case StructType(fields) => fields.map(_.dataType)
+ }
+
+ var result = 0
+ var i = 0
+ val length = struct.numFields
+ while (i < length) {
+ result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed + 1).toInt
+ i += 1
+ }
+ result
+
+ case _ => super.hash(value, dataType, seed)
+ }
+ }
+}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
new file mode 100644
index 0000000000..67a5eb0c7f
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class HiveHasherSuite {
+ private final static HiveHasher hasher = new HiveHasher();
+
+ @Test
+ public void testKnownIntegerInputs() {
+ int[] inputs = {0, Integer.MIN_VALUE, Integer.MAX_VALUE, 593689054, -189366624};
+ for (int input : inputs) {
+ Assert.assertEquals(input, HiveHasher.hashInt(input));
+ }
+ }
+
+ @Test
+ public void testKnownLongInputs() {
+ Assert.assertEquals(0, HiveHasher.hashLong(0L));
+ Assert.assertEquals(41, HiveHasher.hashLong(-42L));
+ Assert.assertEquals(42, HiveHasher.hashLong(42L));
+ Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MIN_VALUE));
+ Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testKnownStringAndIntInputs() {
+ int[] inputs = {84, 19, 8};
+ int[] expected = {-823832826, -823835053, 111972242};
+
+ for (int i = 0; i < inputs.length; i++) {
+ UTF8String s = UTF8String.fromString("val_" + inputs[i]);
+ int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes());
+ Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash));
+ }
+ }
+
+ @Test
+ public void randomizedStressTest() {
+ int size = 65536;
+ Random rand = new Random();
+
+ // A set used to track collision rate.
+ Set<Integer> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int vint = rand.nextInt();
+ long lint = rand.nextLong();
+ Assert.assertEquals(HiveHasher.hashInt(vint), HiveHasher.hashInt(vint));
+ Assert.assertEquals(HiveHasher.hashLong(lint), HiveHasher.hashLong(lint));
+
+ hashcodes.add(HiveHasher.hashLong(lint));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95);
+ }
+
+ @Test
+ public void randomizedStressTestBytes() {
+ int size = 65536;
+ Random rand = new Random();
+
+ // A set used to track collision rate.
+ Set<Integer> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int byteArrSize = rand.nextInt(100) * 8;
+ byte[] bytes = new byte[byteArrSize];
+ rand.nextBytes(bytes);
+
+ Assert.assertEquals(
+ HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+ HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+
+ hashcodes.add(HiveHasher.hashUnsafeBytes(
+ bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95);
+ }
+
+ @Test
+ public void randomizedStressTestPaddedStrings() {
+ int size = 64000;
+ // A set used to track collision rate.
+ Set<Integer> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int byteArrSize = 8;
+ byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
+ byte[] paddedBytes = new byte[byteArrSize];
+ System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
+
+ Assert.assertEquals(
+ HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+ HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+
+ hashcodes.add(HiveHasher.hashUnsafeBytes(
+ paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95);
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
index c6a1a2be0d..2d94b66a1e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
@@ -42,8 +42,8 @@ object HashBenchmark {
val benchmark = new Benchmark("Hash For " + name, iters * numRows)
benchmark.addCase("interpreted version") { _: Int =>
+ var sum = 0
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numRows) {
sum += rows(i).hashCode()
@@ -54,8 +54,8 @@ object HashBenchmark {
val getHashCode = UnsafeProjection.create(new Murmur3Hash(attrs) :: Nil, attrs)
benchmark.addCase("codegen version") { _: Int =>
+ var sum = 0
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numRows) {
sum += getHashCode(rows(i)).getInt(0)
@@ -66,8 +66,8 @@ object HashBenchmark {
val getHashCode64b = UnsafeProjection.create(new XxHash64(attrs) :: Nil, attrs)
benchmark.addCase("codegen version 64-bit") { _: Int =>
+ var sum = 0
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numRows) {
sum += getHashCode64b(rows(i)).getInt(0)
@@ -76,30 +76,44 @@ object HashBenchmark {
}
}
+ val getHiveHashCode = UnsafeProjection.create(new HiveHash(attrs) :: Nil, attrs)
+ benchmark.addCase("codegen HiveHash version") { _: Int =>
+ var sum = 0
+ for (_ <- 0L until iters) {
+ var i = 0
+ while (i < numRows) {
+ sum += getHiveHashCode(rows(i)).getInt(0)
+ i += 1
+ }
+ }
+ }
+
benchmark.run()
}
def main(args: Array[String]): Unit = {
val singleInt = new StructType().add("i", IntegerType)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1006 / 1011 133.4 7.5 1.0X
- codegen version 1835 / 1839 73.1 13.7 0.5X
- codegen version 64-bit 1627 / 1628 82.5 12.1 0.6X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 3262 / 3267 164.6 6.1 1.0X
+ codegen version 6448 / 6718 83.3 12.0 0.5X
+ codegen version 64-bit 6088 / 6154 88.2 11.3 0.5X
+ codegen HiveHash version 4732 / 4745 113.5 8.8 0.7X
+ */
test("single ints", singleInt, 1 << 15, 1 << 14)
val singleLong = new StructType().add("i", LongType)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1196 / 1209 112.2 8.9 1.0X
- codegen version 2178 / 2181 61.6 16.2 0.5X
- codegen version 64-bit 1752 / 1753 76.6 13.1 0.7X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 3716 / 3726 144.5 6.9 1.0X
+ codegen version 7706 / 7732 69.7 14.4 0.5X
+ codegen version 64-bit 6370 / 6399 84.3 11.9 0.6X
+ codegen HiveHash version 4924 / 5026 109.0 9.2 0.8X
+ */
test("single longs", singleLong, 1 << 15, 1 << 14)
val normal = new StructType()
@@ -118,13 +132,14 @@ object HashBenchmark {
.add("date", DateType)
.add("timestamp", TimestampType)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 2713 / 2715 0.8 1293.5 1.0X
- codegen version 2015 / 2018 1.0 960.9 1.3X
- codegen version 64-bit 735 / 738 2.9 350.7 3.7X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 2985 / 3013 0.7 1423.4 1.0X
+ codegen version 2422 / 2434 0.9 1155.1 1.2X
+ codegen version 64-bit 856 / 920 2.5 408.0 3.5X
+ codegen HiveHash version 4501 / 4979 0.5 2146.4 0.7X
+ */
test("normal", normal, 1 << 10, 1 << 11)
val arrayOfInt = ArrayType(IntegerType)
@@ -132,13 +147,14 @@ object HashBenchmark {
.add("array", arrayOfInt)
.add("arrayOfArray", ArrayType(arrayOfInt))
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1498 / 1499 0.1 11432.1 1.0X
- codegen version 2642 / 2643 0.0 20158.4 0.6X
- codegen version 64-bit 2421 / 2424 0.1 18472.5 0.6X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 3100 / 3555 0.0 23651.8 1.0X
+ codegen version 5779 / 5865 0.0 44088.4 0.5X
+ codegen version 64-bit 4738 / 4821 0.0 36151.7 0.7X
+ codegen HiveHash version 2200 / 2246 0.1 16785.9 1.4X
+ */
test("array", array, 1 << 8, 1 << 9)
val mapOfInt = MapType(IntegerType, IntegerType)
@@ -146,13 +162,14 @@ object HashBenchmark {
.add("map", mapOfInt)
.add("mapOfMap", MapType(IntegerType, mapOfInt))
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1612 / 1618 0.0 393553.4 1.0X
- codegen version 149 / 150 0.0 36381.2 10.8X
- codegen version 64-bit 144 / 145 0.0 35122.1 11.2X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 0 / 0 48.1 20.8 1.0X
+ codegen version 257 / 275 0.0 62768.7 0.0X
+ codegen version 64-bit 226 / 240 0.0 55224.5 0.0X
+ codegen HiveHash version 89 / 96 0.0 21708.8 0.0X
+ */
test("map", map, 1 << 6, 1 << 6)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
index 53f21a8442..2a753a0c84 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import java.util.Random
-import org.apache.spark.sql.catalyst.expressions.XXH64
+import org.apache.spark.sql.catalyst.expressions.{HiveHasher, XXH64}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
import org.apache.spark.util.Benchmark
@@ -38,8 +38,8 @@ object HashByteArrayBenchmark {
val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays)
benchmark.addCase("Murmur3_x86_32") { _: Int =>
+ var sum = 0L
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numArrays) {
sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
@@ -49,8 +49,8 @@ object HashByteArrayBenchmark {
}
benchmark.addCase("xxHash 64-bit") { _: Int =>
+ var sum = 0L
for (_ <- 0L until iters) {
- var sum = 0L
var i = 0
while (i < numArrays) {
sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
@@ -59,90 +59,110 @@ object HashByteArrayBenchmark {
}
}
+ benchmark.addCase("HiveHasher") { _: Int =>
+ var sum = 0L
+ for (_ <- 0L until iters) {
+ var i = 0
+ while (i < numArrays) {
+ sum += HiveHasher.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
+ i += 1
+ }
+ }
+ }
+
benchmark.run()
}
def main(args: Array[String]): Unit = {
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 11 / 12 185.1 5.4 1.0X
- xxHash 64-bit 17 / 18 120.0 8.3 0.6X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 12 / 16 174.3 5.7 1.0X
+ xxHash 64-bit 17 / 22 120.0 8.3 0.7X
+ HiveHasher 13 / 15 162.1 6.2 0.9X
*/
test(8, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 18 / 18 118.6 8.4 1.0X
- xxHash 64-bit 20 / 21 102.5 9.8 0.9X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 19 / 22 107.6 9.3 1.0X
+ xxHash 64-bit 20 / 24 104.6 9.6 1.0X
+ HiveHasher 24 / 28 87.0 11.5 0.8X
*/
test(16, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 24 / 24 86.6 11.5 1.0X
- xxHash 64-bit 23 / 23 93.2 10.7 1.1X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 28 / 32 74.8 13.4 1.0X
+ xxHash 64-bit 24 / 29 87.3 11.5 1.2X
+ HiveHasher 36 / 41 57.7 17.3 0.8X
*/
test(24, 42L, 1 << 10, 1 << 11)
// Add 31 to all arrays to create worse case alignment for xxHash.
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 38 / 39 54.7 18.3 1.0X
- xxHash 64-bit 33 / 33 64.4 15.5 1.2X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 41 / 45 51.1 19.6 1.0X
+ xxHash 64-bit 36 / 44 58.8 17.0 1.2X
+ HiveHasher 49 / 54 42.6 23.5 0.8X
*/
test(31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 91 / 94 22.9 43.6 1.0X
- xxHash 64-bit 68 / 69 30.6 32.7 1.3X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 100 / 110 21.0 47.7 1.0X
+ xxHash 64-bit 74 / 78 28.2 35.5 1.3X
+ HiveHasher 189 / 196 11.1 90.3 0.5X
*/
test(64 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 268 / 268 7.8 127.6 1.0X
- xxHash 64-bit 108 / 109 19.4 51.6 2.5X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 299 / 311 7.0 142.4 1.0X
+ xxHash 64-bit 113 / 122 18.5 54.1 2.6X
+ HiveHasher 620 / 624 3.4 295.5 0.5X
*/
test(256 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 942 / 945 2.2 449.4 1.0X
- xxHash 64-bit 276 / 276 7.6 131.4 3.4X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 1068 / 1070 2.0 509.1 1.0X
+ xxHash 64-bit 306 / 315 6.9 145.9 3.5X
+ HiveHasher 2316 / 2369 0.9 1104.3 0.5X
*/
test(1024 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 1839 / 1843 1.1 876.8 1.0X
- xxHash 64-bit 445 / 448 4.7 212.1 4.1X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 2252 / 2274 0.9 1074.1 1.0X
+ xxHash 64-bit 534 / 580 3.9 254.6 4.2X
+ HiveHasher 4739 / 4786 0.4 2259.8 0.5X
*/
test(2048 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 7307 / 7310 0.3 3484.4 1.0X
- xxHash 64-bit 1487 / 1488 1.4 709.1 4.9X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 9249 / 9586 0.2 4410.5 1.0X
+ xxHash 64-bit 2897 / 3241 0.7 1381.6 3.2X
+ HiveHasher 19392 / 20211 0.1 9246.6 0.5X
+ */
test(8192 + 31, 42L, 1 << 10, 1 << 11)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
index 33916c0891..13ce588462 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
@@ -145,7 +145,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get
val encoder = RowEncoder(inputSchema)
val seed = scala.util.Random.nextInt()
- test(s"murmur3/xxHash64 hash: ${inputSchema.simpleString}") {
+ test(s"murmur3/xxHash64/hive hash: ${inputSchema.simpleString}") {
for (_ <- 1 to 10) {
val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow]
val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map {
@@ -154,6 +154,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// Only test the interpreted version has same result with codegen version.
checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval())
checkEvaluation(XxHash64(literals, seed), XxHash64(literals, seed).eval())
+ checkEvaluation(HiveHash(literals), HiveHash(literals).eval())
}
}
}