aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-10-04 18:59:31 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-10-04 18:59:31 -0700
commita99743d053e84f695dc3034550939555297b0a05 (patch)
tree566a00324e1d3fdabc416e31efd3c25a3e6cf2cb
parent8d969a2125d915da1506c17833aa98da614a257f (diff)
downloadspark-a99743d053e84f695dc3034550939555297b0a05.tar.gz
spark-a99743d053e84f695dc3034550939555297b0a05.tar.bz2
spark-a99743d053e84f695dc3034550939555297b0a05.zip
[SPARK-17495][SQL] Add Hash capability semantically equivalent to Hive's
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17495 Spark internally uses Murmur3Hash for partitioning. This is different from the one used by Hive. For queries which use bucketing this leads to different results if one tries the same query on both engines. For us, we want users to have backward compatibility to that one can switch parts of applications across the engines without observing regressions. This PR includes `HiveHash`, `HiveHashFunction`, `HiveHasher` which mimics Hive's hashing at https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java#L638 I am intentionally not introducing any usages of this hash function in rest of the code to keep this PR small. My eventual goal is to have Hive bucketing support in Spark. Once this PR gets in, I will make hash function pluggable in relevant areas (eg. `HashPartitioning`'s `partitionIdExpression` has Murmur3 hardcoded : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L265) ## How was this patch tested? Added `HiveHashSuite` Author: Tejas Patil <tejasp@fb.com> Closes #15047 from tejasapatil/SPARK-17495_hive_hash.
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala391
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java128
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala93
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala118
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala3
6 files changed, 631 insertions, 151 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
new file mode 100644
index 0000000000..c7ea9085eb
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * Simulates Hive's hashing function at
+ * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode()
+ */
+public class HiveHasher {
+
+ @Override
+ public String toString() {
+ return HiveHasher.class.getSimpleName();
+ }
+
+ public static int hashInt(int input) {
+ return input;
+ }
+
+ public static int hashLong(long input) {
+ return (int) ((input >>> 32) ^ input);
+ }
+
+ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
+ assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
+ int result = 0;
+ for (int i = 0; i < lengthInBytes; i++) {
+ result = (result * 31) + (int) Platform.getByte(base, offset + i);
+ }
+ return result;
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index dbb52a4bb1..138ef2a1dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -259,7 +259,7 @@ abstract class HashExpression[E] extends Expression {
$childrenHash""")
}
- private def nullSafeElementHash(
+ protected def nullSafeElementHash(
input: String,
index: String,
nullable: Boolean,
@@ -276,76 +276,127 @@ abstract class HashExpression[E] extends Expression {
}
}
- @tailrec
- private def computeHash(
+ protected def genHashInt(i: String, result: String): String =
+ s"$result = $hasherClassName.hashInt($i, $result);"
+
+ protected def genHashLong(l: String, result: String): String =
+ s"$result = $hasherClassName.hashLong($l, $result);"
+
+ protected def genHashBytes(b: String, result: String): String = {
+ val offset = "Platform.BYTE_ARRAY_OFFSET"
+ s"$result = $hasherClassName.hashUnsafeBytes($b, $offset, $b.length, $result);"
+ }
+
+ protected def genHashBoolean(input: String, result: String): String =
+ genHashInt(s"$input ? 1 : 0", result)
+
+ protected def genHashFloat(input: String, result: String): String =
+ genHashInt(s"Float.floatToIntBits($input)", result)
+
+ protected def genHashDouble(input: String, result: String): String =
+ genHashLong(s"Double.doubleToLongBits($input)", result)
+
+ protected def genHashDecimal(
+ ctx: CodegenContext,
+ d: DecimalType,
input: String,
- dataType: DataType,
- result: String,
- ctx: CodegenContext): String = {
- val hasher = hasherClassName
-
- def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);"
- def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);"
- def hashBytes(b: String): String =
- s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, $result);"
-
- dataType match {
- case NullType => ""
- case BooleanType => hashInt(s"$input ? 1 : 0")
- case ByteType | ShortType | IntegerType | DateType => hashInt(input)
- case LongType | TimestampType => hashLong(input)
- case FloatType => hashInt(s"Float.floatToIntBits($input)")
- case DoubleType => hashLong(s"Double.doubleToLongBits($input)")
- case d: DecimalType =>
- if (d.precision <= Decimal.MAX_LONG_DIGITS) {
- hashLong(s"$input.toUnscaledLong()")
- } else {
- val bytes = ctx.freshName("bytes")
- s"""
+ result: String): String = {
+ if (d.precision <= Decimal.MAX_LONG_DIGITS) {
+ genHashLong(s"$input.toUnscaledLong()", result)
+ } else {
+ val bytes = ctx.freshName("bytes")
+ s"""
final byte[] $bytes = $input.toJavaBigDecimal().unscaledValue().toByteArray();
- ${hashBytes(bytes)}
+ ${genHashBytes(bytes, result)}
"""
+ }
+ }
+
+ protected def genHashCalendarInterval(input: String, result: String): String = {
+ val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
+ s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
+ }
+
+ protected def genHashString(input: String, result: String): String = {
+ val baseObject = s"$input.getBaseObject()"
+ val baseOffset = s"$input.getBaseOffset()"
+ val numBytes = s"$input.numBytes()"
+ s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
+ }
+
+ protected def genHashForMap(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ val keys = ctx.freshName("keys")
+ val values = ctx.freshName("values")
+ s"""
+ final ArrayData $keys = $input.keyArray();
+ final ArrayData $values = $input.valueArray();
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ ${nullSafeElementHash(keys, index, false, keyType, result, ctx)}
+ ${nullSafeElementHash(values, index, valueContainsNull, valueType, result, ctx)}
}
- case CalendarIntervalType =>
- val microsecondsHash = s"$hasher.hashLong($input.microseconds, $result)"
- s"$result = $hasher.hashInt($input.months, $microsecondsHash);"
- case BinaryType => hashBytes(input)
- case StringType =>
- val baseObject = s"$input.getBaseObject()"
- val baseOffset = s"$input.getBaseOffset()"
- val numBytes = s"$input.numBytes()"
- s"$result = $hasher.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
-
- case ArrayType(et, containsNull) =>
- val index = ctx.freshName("index")
- s"""
- for (int $index = 0; $index < $input.numElements(); $index++) {
- ${nullSafeElementHash(input, index, containsNull, et, result, ctx)}
- }
- """
-
- case MapType(kt, vt, valueContainsNull) =>
- val index = ctx.freshName("index")
- val keys = ctx.freshName("keys")
- val values = ctx.freshName("values")
- s"""
- final ArrayData $keys = $input.keyArray();
- final ArrayData $values = $input.valueArray();
- for (int $index = 0; $index < $input.numElements(); $index++) {
- ${nullSafeElementHash(keys, index, false, kt, result, ctx)}
- ${nullSafeElementHash(values, index, valueContainsNull, vt, result, ctx)}
- }
- """
+ """
+ }
+
+ protected def genHashForArray(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ elementType: DataType,
+ containsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ s"""
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ ${nullSafeElementHash(input, index, containsNull, elementType, result, ctx)}
+ }
+ """
+ }
- case StructType(fields) =>
- fields.zipWithIndex.map { case (field, index) =>
- nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx)
- }.mkString("\n")
+ protected def genHashForStruct(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ fields: Array[StructField]): String = {
+ fields.zipWithIndex.map { case (field, index) =>
+ nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx)
+ }.mkString("\n")
+ }
- case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx)
- }
+ @tailrec
+ private def computeHashWithTailRec(
+ input: String,
+ dataType: DataType,
+ result: String,
+ ctx: CodegenContext): String = dataType match {
+ case NullType => ""
+ case BooleanType => genHashBoolean(input, result)
+ case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
+ case LongType | TimestampType => genHashLong(input, result)
+ case FloatType => genHashFloat(input, result)
+ case DoubleType => genHashDouble(input, result)
+ case d: DecimalType => genHashDecimal(ctx, d, input, result)
+ case CalendarIntervalType => genHashCalendarInterval(input, result)
+ case BinaryType => genHashBytes(input, result)
+ case StringType => genHashString(input, result)
+ case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull)
+ case MapType(kt, vt, valueContainsNull) =>
+ genHashForMap(ctx, input, result, kt, vt, valueContainsNull)
+ case StructType(fields) => genHashForStruct(ctx, input, result, fields)
+ case udt: UserDefinedType[_] => computeHashWithTailRec(input, udt.sqlType, result, ctx)
}
+ protected def computeHash(
+ input: String,
+ dataType: DataType,
+ result: String,
+ ctx: CodegenContext): String = computeHashWithTailRec(input, dataType, result, ctx)
+
protected def hasherClassName: String
}
@@ -568,3 +619,217 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable {
override def foldable: Boolean = true
override def nullable: Boolean = false
}
+
+/**
+ * Simulates Hive's hashing function at
+ * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive
+ *
+ * We should use this hash function for both shuffle and bucket of Hive tables, so that
+ * we can guarantee shuffle and bucketing have same data distribution
+ *
+ * TODO: Support Decimal and date related types
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.")
+case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
+ override val seed = 0
+
+ override def dataType: DataType = IntegerType
+
+ override def prettyName: String = "hive-hash"
+
+ override protected def hasherClassName: String = classOf[HiveHasher].getName
+
+ override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = {
+ HiveHashFunction.hash(value, dataType, seed).toInt
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ ev.isNull = "false"
+ val childHash = ctx.freshName("childHash")
+ val childrenHash = children.map { child =>
+ val childGen = child.genCode(ctx)
+ childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
+ computeHash(childGen.value, child.dataType, childHash, ctx)
+ } + s"${ev.value} = (31 * ${ev.value}) + $childHash;"
+ }.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "")
+
+ ev.copy(code = s"""
+ ${ctx.javaType(dataType)} ${ev.value} = $seed;
+ $childrenHash""")
+ }
+
+ override def eval(input: InternalRow): Int = {
+ var hash = seed
+ var i = 0
+ val len = children.length
+ while (i < len) {
+ hash = (31 * hash) + computeHash(children(i).eval(input), children(i).dataType, hash)
+ i += 1
+ }
+ hash
+ }
+
+ override protected def genHashInt(i: String, result: String): String =
+ s"$result = $hasherClassName.hashInt($i);"
+
+ override protected def genHashLong(l: String, result: String): String =
+ s"$result = $hasherClassName.hashLong($l);"
+
+ override protected def genHashBytes(b: String, result: String): String =
+ s"$result = $hasherClassName.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length);"
+
+ override protected def genHashCalendarInterval(input: String, result: String): String = {
+ s"""
+ $result = (31 * $hasherClassName.hashInt($input.months)) +
+ $hasherClassName.hashLong($input.microseconds);"
+ """
+ }
+
+ override protected def genHashString(input: String, result: String): String = {
+ val baseObject = s"$input.getBaseObject()"
+ val baseOffset = s"$input.getBaseOffset()"
+ val numBytes = s"$input.numBytes()"
+ s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);"
+ }
+
+ override protected def genHashForArray(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ elementType: DataType,
+ containsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ val childResult = ctx.freshName("childResult")
+ s"""
+ int $childResult = 0;
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ $childResult = 0;
+ ${nullSafeElementHash(input, index, containsNull, elementType, childResult, ctx)};
+ $result = (31 * $result) + $childResult;
+ }
+ """
+ }
+
+ override protected def genHashForMap(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean): String = {
+ val index = ctx.freshName("index")
+ val keys = ctx.freshName("keys")
+ val values = ctx.freshName("values")
+ val keyResult = ctx.freshName("keyResult")
+ val valueResult = ctx.freshName("valueResult")
+ s"""
+ final ArrayData $keys = $input.keyArray();
+ final ArrayData $values = $input.valueArray();
+ int $keyResult = 0;
+ int $valueResult = 0;
+ for (int $index = 0; $index < $input.numElements(); $index++) {
+ $keyResult = 0;
+ ${nullSafeElementHash(keys, index, false, keyType, keyResult, ctx)}
+ $valueResult = 0;
+ ${nullSafeElementHash(values, index, valueContainsNull, valueType, valueResult, ctx)}
+ $result += $keyResult ^ $valueResult;
+ }
+ """
+ }
+
+ override protected def genHashForStruct(
+ ctx: CodegenContext,
+ input: String,
+ result: String,
+ fields: Array[StructField]): String = {
+ val localResult = ctx.freshName("localResult")
+ val childResult = ctx.freshName("childResult")
+ fields.zipWithIndex.map { case (field, index) =>
+ s"""
+ $childResult = 0;
+ ${nullSafeElementHash(input, index.toString, field.nullable, field.dataType,
+ childResult, ctx)}
+ $localResult = (31 * $localResult) + $childResult;
+ """
+ }.mkString(
+ s"""
+ int $localResult = 0;
+ int $childResult = 0;
+ """,
+ "",
+ s"$result = (31 * $result) + $localResult;"
+ )
+ }
+}
+
+object HiveHashFunction extends InterpretedHashFunction {
+ override protected def hashInt(i: Int, seed: Long): Long = {
+ HiveHasher.hashInt(i)
+ }
+
+ override protected def hashLong(l: Long, seed: Long): Long = {
+ HiveHasher.hashLong(l)
+ }
+
+ override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ HiveHasher.hashUnsafeBytes(base, offset, len)
+ }
+
+ override def hash(value: Any, dataType: DataType, seed: Long): Long = {
+ value match {
+ case null => 0
+ case array: ArrayData =>
+ val elementType = dataType match {
+ case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
+ case ArrayType(et, _) => et
+ }
+
+ var result = 0
+ var i = 0
+ val length = array.numElements()
+ while (i < length) {
+ result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt
+ i += 1
+ }
+ result
+
+ case map: MapData =>
+ val (kt, vt) = dataType match {
+ case udt: UserDefinedType[_] =>
+ val mapType = udt.sqlType.asInstanceOf[MapType]
+ mapType.keyType -> mapType.valueType
+ case MapType(_kt, _vt, _) => _kt -> _vt
+ }
+ val keys = map.keyArray()
+ val values = map.valueArray()
+
+ var result = 0
+ var i = 0
+ val length = map.numElements()
+ while (i < length) {
+ result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt
+ i += 1
+ }
+ result
+
+ case struct: InternalRow =>
+ val types: Array[DataType] = dataType match {
+ case udt: UserDefinedType[_] =>
+ udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray
+ case StructType(fields) => fields.map(_.dataType)
+ }
+
+ var result = 0
+ var i = 0
+ val length = struct.numFields
+ while (i < length) {
+ result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed + 1).toInt
+ i += 1
+ }
+ result
+
+ case _ => super.hash(value, dataType, seed)
+ }
+ }
+}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
new file mode 100644
index 0000000000..67a5eb0c7f
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class HiveHasherSuite {
+ private final static HiveHasher hasher = new HiveHasher();
+
+ @Test
+ public void testKnownIntegerInputs() {
+ int[] inputs = {0, Integer.MIN_VALUE, Integer.MAX_VALUE, 593689054, -189366624};
+ for (int input : inputs) {
+ Assert.assertEquals(input, HiveHasher.hashInt(input));
+ }
+ }
+
+ @Test
+ public void testKnownLongInputs() {
+ Assert.assertEquals(0, HiveHasher.hashLong(0L));
+ Assert.assertEquals(41, HiveHasher.hashLong(-42L));
+ Assert.assertEquals(42, HiveHasher.hashLong(42L));
+ Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MIN_VALUE));
+ Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testKnownStringAndIntInputs() {
+ int[] inputs = {84, 19, 8};
+ int[] expected = {-823832826, -823835053, 111972242};
+
+ for (int i = 0; i < inputs.length; i++) {
+ UTF8String s = UTF8String.fromString("val_" + inputs[i]);
+ int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes());
+ Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash));
+ }
+ }
+
+ @Test
+ public void randomizedStressTest() {
+ int size = 65536;
+ Random rand = new Random();
+
+ // A set used to track collision rate.
+ Set<Integer> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int vint = rand.nextInt();
+ long lint = rand.nextLong();
+ Assert.assertEquals(HiveHasher.hashInt(vint), HiveHasher.hashInt(vint));
+ Assert.assertEquals(HiveHasher.hashLong(lint), HiveHasher.hashLong(lint));
+
+ hashcodes.add(HiveHasher.hashLong(lint));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95);
+ }
+
+ @Test
+ public void randomizedStressTestBytes() {
+ int size = 65536;
+ Random rand = new Random();
+
+ // A set used to track collision rate.
+ Set<Integer> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int byteArrSize = rand.nextInt(100) * 8;
+ byte[] bytes = new byte[byteArrSize];
+ rand.nextBytes(bytes);
+
+ Assert.assertEquals(
+ HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+ HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+
+ hashcodes.add(HiveHasher.hashUnsafeBytes(
+ bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95);
+ }
+
+ @Test
+ public void randomizedStressTestPaddedStrings() {
+ int size = 64000;
+ // A set used to track collision rate.
+ Set<Integer> hashcodes = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ int byteArrSize = 8;
+ byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
+ byte[] paddedBytes = new byte[byteArrSize];
+ System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
+
+ Assert.assertEquals(
+ HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+ HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+
+ hashcodes.add(HiveHasher.hashUnsafeBytes(
+ paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ }
+
+ // A very loose bound.
+ Assert.assertTrue(hashcodes.size() > size * 0.95);
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
index c6a1a2be0d..2d94b66a1e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
@@ -42,8 +42,8 @@ object HashBenchmark {
val benchmark = new Benchmark("Hash For " + name, iters * numRows)
benchmark.addCase("interpreted version") { _: Int =>
+ var sum = 0
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numRows) {
sum += rows(i).hashCode()
@@ -54,8 +54,8 @@ object HashBenchmark {
val getHashCode = UnsafeProjection.create(new Murmur3Hash(attrs) :: Nil, attrs)
benchmark.addCase("codegen version") { _: Int =>
+ var sum = 0
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numRows) {
sum += getHashCode(rows(i)).getInt(0)
@@ -66,8 +66,8 @@ object HashBenchmark {
val getHashCode64b = UnsafeProjection.create(new XxHash64(attrs) :: Nil, attrs)
benchmark.addCase("codegen version 64-bit") { _: Int =>
+ var sum = 0
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numRows) {
sum += getHashCode64b(rows(i)).getInt(0)
@@ -76,30 +76,44 @@ object HashBenchmark {
}
}
+ val getHiveHashCode = UnsafeProjection.create(new HiveHash(attrs) :: Nil, attrs)
+ benchmark.addCase("codegen HiveHash version") { _: Int =>
+ var sum = 0
+ for (_ <- 0L until iters) {
+ var i = 0
+ while (i < numRows) {
+ sum += getHiveHashCode(rows(i)).getInt(0)
+ i += 1
+ }
+ }
+ }
+
benchmark.run()
}
def main(args: Array[String]): Unit = {
val singleInt = new StructType().add("i", IntegerType)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1006 / 1011 133.4 7.5 1.0X
- codegen version 1835 / 1839 73.1 13.7 0.5X
- codegen version 64-bit 1627 / 1628 82.5 12.1 0.6X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 3262 / 3267 164.6 6.1 1.0X
+ codegen version 6448 / 6718 83.3 12.0 0.5X
+ codegen version 64-bit 6088 / 6154 88.2 11.3 0.5X
+ codegen HiveHash version 4732 / 4745 113.5 8.8 0.7X
+ */
test("single ints", singleInt, 1 << 15, 1 << 14)
val singleLong = new StructType().add("i", LongType)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1196 / 1209 112.2 8.9 1.0X
- codegen version 2178 / 2181 61.6 16.2 0.5X
- codegen version 64-bit 1752 / 1753 76.6 13.1 0.7X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 3716 / 3726 144.5 6.9 1.0X
+ codegen version 7706 / 7732 69.7 14.4 0.5X
+ codegen version 64-bit 6370 / 6399 84.3 11.9 0.6X
+ codegen HiveHash version 4924 / 5026 109.0 9.2 0.8X
+ */
test("single longs", singleLong, 1 << 15, 1 << 14)
val normal = new StructType()
@@ -118,13 +132,14 @@ object HashBenchmark {
.add("date", DateType)
.add("timestamp", TimestampType)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 2713 / 2715 0.8 1293.5 1.0X
- codegen version 2015 / 2018 1.0 960.9 1.3X
- codegen version 64-bit 735 / 738 2.9 350.7 3.7X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 2985 / 3013 0.7 1423.4 1.0X
+ codegen version 2422 / 2434 0.9 1155.1 1.2X
+ codegen version 64-bit 856 / 920 2.5 408.0 3.5X
+ codegen HiveHash version 4501 / 4979 0.5 2146.4 0.7X
+ */
test("normal", normal, 1 << 10, 1 << 11)
val arrayOfInt = ArrayType(IntegerType)
@@ -132,13 +147,14 @@ object HashBenchmark {
.add("array", arrayOfInt)
.add("arrayOfArray", ArrayType(arrayOfInt))
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1498 / 1499 0.1 11432.1 1.0X
- codegen version 2642 / 2643 0.0 20158.4 0.6X
- codegen version 64-bit 2421 / 2424 0.1 18472.5 0.6X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 3100 / 3555 0.0 23651.8 1.0X
+ codegen version 5779 / 5865 0.0 44088.4 0.5X
+ codegen version 64-bit 4738 / 4821 0.0 36151.7 0.7X
+ codegen HiveHash version 2200 / 2246 0.1 16785.9 1.4X
+ */
test("array", array, 1 << 8, 1 << 9)
val mapOfInt = MapType(IntegerType, IntegerType)
@@ -146,13 +162,14 @@ object HashBenchmark {
.add("map", mapOfInt)
.add("mapOfMap", MapType(IntegerType, mapOfInt))
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- interpreted version 1612 / 1618 0.0 393553.4 1.0X
- codegen version 149 / 150 0.0 36381.2 10.8X
- codegen version 64-bit 144 / 145 0.0 35122.1 11.2X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ interpreted version 0 / 0 48.1 20.8 1.0X
+ codegen version 257 / 275 0.0 62768.7 0.0X
+ codegen version 64-bit 226 / 240 0.0 55224.5 0.0X
+ codegen HiveHash version 89 / 96 0.0 21708.8 0.0X
+ */
test("map", map, 1 << 6, 1 << 6)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
index 53f21a8442..2a753a0c84 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import java.util.Random
-import org.apache.spark.sql.catalyst.expressions.XXH64
+import org.apache.spark.sql.catalyst.expressions.{HiveHasher, XXH64}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
import org.apache.spark.util.Benchmark
@@ -38,8 +38,8 @@ object HashByteArrayBenchmark {
val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays)
benchmark.addCase("Murmur3_x86_32") { _: Int =>
+ var sum = 0L
for (_ <- 0L until iters) {
- var sum = 0
var i = 0
while (i < numArrays) {
sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
@@ -49,8 +49,8 @@ object HashByteArrayBenchmark {
}
benchmark.addCase("xxHash 64-bit") { _: Int =>
+ var sum = 0L
for (_ <- 0L until iters) {
- var sum = 0L
var i = 0
while (i < numArrays) {
sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
@@ -59,90 +59,110 @@ object HashByteArrayBenchmark {
}
}
+ benchmark.addCase("HiveHasher") { _: Int =>
+ var sum = 0L
+ for (_ <- 0L until iters) {
+ var i = 0
+ while (i < numArrays) {
+ sum += HiveHasher.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
+ i += 1
+ }
+ }
+ }
+
benchmark.run()
}
def main(args: Array[String]): Unit = {
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 11 / 12 185.1 5.4 1.0X
- xxHash 64-bit 17 / 18 120.0 8.3 0.6X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 12 / 16 174.3 5.7 1.0X
+ xxHash 64-bit 17 / 22 120.0 8.3 0.7X
+ HiveHasher 13 / 15 162.1 6.2 0.9X
*/
test(8, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 18 / 18 118.6 8.4 1.0X
- xxHash 64-bit 20 / 21 102.5 9.8 0.9X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 19 / 22 107.6 9.3 1.0X
+ xxHash 64-bit 20 / 24 104.6 9.6 1.0X
+ HiveHasher 24 / 28 87.0 11.5 0.8X
*/
test(16, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 24 / 24 86.6 11.5 1.0X
- xxHash 64-bit 23 / 23 93.2 10.7 1.1X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 28 / 32 74.8 13.4 1.0X
+ xxHash 64-bit 24 / 29 87.3 11.5 1.2X
+ HiveHasher 36 / 41 57.7 17.3 0.8X
*/
test(24, 42L, 1 << 10, 1 << 11)
// Add 31 to all arrays to create worse case alignment for xxHash.
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 38 / 39 54.7 18.3 1.0X
- xxHash 64-bit 33 / 33 64.4 15.5 1.2X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 41 / 45 51.1 19.6 1.0X
+ xxHash 64-bit 36 / 44 58.8 17.0 1.2X
+ HiveHasher 49 / 54 42.6 23.5 0.8X
*/
test(31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 91 / 94 22.9 43.6 1.0X
- xxHash 64-bit 68 / 69 30.6 32.7 1.3X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 100 / 110 21.0 47.7 1.0X
+ xxHash 64-bit 74 / 78 28.2 35.5 1.3X
+ HiveHasher 189 / 196 11.1 90.3 0.5X
*/
test(64 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 268 / 268 7.8 127.6 1.0X
- xxHash 64-bit 108 / 109 19.4 51.6 2.5X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 299 / 311 7.0 142.4 1.0X
+ xxHash 64-bit 113 / 122 18.5 54.1 2.6X
+ HiveHasher 620 / 624 3.4 295.5 0.5X
*/
test(256 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 942 / 945 2.2 449.4 1.0X
- xxHash 64-bit 276 / 276 7.6 131.4 3.4X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 1068 / 1070 2.0 509.1 1.0X
+ xxHash 64-bit 306 / 315 6.9 145.9 3.5X
+ HiveHasher 2316 / 2369 0.9 1104.3 0.5X
*/
test(1024 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 1839 / 1843 1.1 876.8 1.0X
- xxHash 64-bit 445 / 448 4.7 212.1 4.1X
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 2252 / 2274 0.9 1074.1 1.0X
+ xxHash 64-bit 534 / 580 3.9 254.6 4.2X
+ HiveHasher 4739 / 4786 0.4 2259.8 0.5X
*/
test(2048 + 31, 42L, 1 << 10, 1 << 11)
/*
- Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
- Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Murmur3_x86_32 7307 / 7310 0.3 3484.4 1.0X
- xxHash 64-bit 1487 / 1488 1.4 709.1 4.9X
- */
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Murmur3_x86_32 9249 / 9586 0.2 4410.5 1.0X
+ xxHash 64-bit 2897 / 3241 0.7 1381.6 3.2X
+ HiveHasher 19392 / 20211 0.1 9246.6 0.5X
+ */
test(8192 + 31, 42L, 1 << 10, 1 << 11)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
index 33916c0891..13ce588462 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
@@ -145,7 +145,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get
val encoder = RowEncoder(inputSchema)
val seed = scala.util.Random.nextInt()
- test(s"murmur3/xxHash64 hash: ${inputSchema.simpleString}") {
+ test(s"murmur3/xxHash64/hive hash: ${inputSchema.simpleString}") {
for (_ <- 1 to 10) {
val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow]
val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map {
@@ -154,6 +154,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// Only test the interpreted version has same result with codegen version.
checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval())
checkEvaluation(XxHash64(literals, seed), XxHash64(literals, seed).eval())
+ checkEvaluation(HiveHash(literals), HiveHash(literals).eval())
}
}
}