diff options
Diffstat (limited to 'sql/catalyst/src')
5 files changed, 582 insertions, 151 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index dbb52a4bb1..138ef2a1dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -259,7 +259,7 @@ abstract class HashExpression[E] extends Expression { $childrenHash""") } - private def nullSafeElementHash( + protected def nullSafeElementHash( input: String, index: String, nullable: Boolean, @@ -276,76 +276,127 @@ abstract class HashExpression[E] extends Expression { } } - @tailrec - private def computeHash( + protected def genHashInt(i: String, result: String): String = + s"$result = $hasherClassName.hashInt($i, $result);" + + protected def genHashLong(l: String, result: String): String = + s"$result = $hasherClassName.hashLong($l, $result);" + + protected def genHashBytes(b: String, result: String): String = { + val offset = "Platform.BYTE_ARRAY_OFFSET" + s"$result = $hasherClassName.hashUnsafeBytes($b, $offset, $b.length, $result);" + } + + protected def genHashBoolean(input: String, result: String): String = + genHashInt(s"$input ? 1 : 0", result) + + protected def genHashFloat(input: String, result: String): String = + genHashInt(s"Float.floatToIntBits($input)", result) + + protected def genHashDouble(input: String, result: String): String = + genHashLong(s"Double.doubleToLongBits($input)", result) + + protected def genHashDecimal( + ctx: CodegenContext, + d: DecimalType, input: String, - dataType: DataType, - result: String, - ctx: CodegenContext): String = { - val hasher = hasherClassName - - def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);" - def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);" - def hashBytes(b: String): String = - s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, $result);" - - dataType match { - case NullType => "" - case BooleanType => hashInt(s"$input ? 1 : 0") - case ByteType | ShortType | IntegerType | DateType => hashInt(input) - case LongType | TimestampType => hashLong(input) - case FloatType => hashInt(s"Float.floatToIntBits($input)") - case DoubleType => hashLong(s"Double.doubleToLongBits($input)") - case d: DecimalType => - if (d.precision <= Decimal.MAX_LONG_DIGITS) { - hashLong(s"$input.toUnscaledLong()") - } else { - val bytes = ctx.freshName("bytes") - s""" + result: String): String = { + if (d.precision <= Decimal.MAX_LONG_DIGITS) { + genHashLong(s"$input.toUnscaledLong()", result) + } else { + val bytes = ctx.freshName("bytes") + s""" final byte[] $bytes = $input.toJavaBigDecimal().unscaledValue().toByteArray(); - ${hashBytes(bytes)} + ${genHashBytes(bytes, result)} """ + } + } + + protected def genHashCalendarInterval(input: String, result: String): String = { + val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)" + s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);" + } + + protected def genHashString(input: String, result: String): String = { + val baseObject = s"$input.getBaseObject()" + val baseOffset = s"$input.getBaseOffset()" + val numBytes = s"$input.numBytes()" + s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" + } + + protected def genHashForMap( + ctx: CodegenContext, + input: String, + result: String, + keyType: DataType, + valueType: DataType, + valueContainsNull: Boolean): String = { + val index = ctx.freshName("index") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + s""" + final ArrayData $keys = $input.keyArray(); + final ArrayData $values = $input.valueArray(); + for (int $index = 0; $index < $input.numElements(); $index++) { + ${nullSafeElementHash(keys, index, false, keyType, result, ctx)} + ${nullSafeElementHash(values, index, valueContainsNull, valueType, result, ctx)} } - case CalendarIntervalType => - val microsecondsHash = s"$hasher.hashLong($input.microseconds, $result)" - s"$result = $hasher.hashInt($input.months, $microsecondsHash);" - case BinaryType => hashBytes(input) - case StringType => - val baseObject = s"$input.getBaseObject()" - val baseOffset = s"$input.getBaseOffset()" - val numBytes = s"$input.numBytes()" - s"$result = $hasher.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" - - case ArrayType(et, containsNull) => - val index = ctx.freshName("index") - s""" - for (int $index = 0; $index < $input.numElements(); $index++) { - ${nullSafeElementHash(input, index, containsNull, et, result, ctx)} - } - """ - - case MapType(kt, vt, valueContainsNull) => - val index = ctx.freshName("index") - val keys = ctx.freshName("keys") - val values = ctx.freshName("values") - s""" - final ArrayData $keys = $input.keyArray(); - final ArrayData $values = $input.valueArray(); - for (int $index = 0; $index < $input.numElements(); $index++) { - ${nullSafeElementHash(keys, index, false, kt, result, ctx)} - ${nullSafeElementHash(values, index, valueContainsNull, vt, result, ctx)} - } - """ + """ + } + + protected def genHashForArray( + ctx: CodegenContext, + input: String, + result: String, + elementType: DataType, + containsNull: Boolean): String = { + val index = ctx.freshName("index") + s""" + for (int $index = 0; $index < $input.numElements(); $index++) { + ${nullSafeElementHash(input, index, containsNull, elementType, result, ctx)} + } + """ + } - case StructType(fields) => - fields.zipWithIndex.map { case (field, index) => - nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) - }.mkString("\n") + protected def genHashForStruct( + ctx: CodegenContext, + input: String, + result: String, + fields: Array[StructField]): String = { + fields.zipWithIndex.map { case (field, index) => + nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) + }.mkString("\n") + } - case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) - } + @tailrec + private def computeHashWithTailRec( + input: String, + dataType: DataType, + result: String, + ctx: CodegenContext): String = dataType match { + case NullType => "" + case BooleanType => genHashBoolean(input, result) + case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result) + case LongType | TimestampType => genHashLong(input, result) + case FloatType => genHashFloat(input, result) + case DoubleType => genHashDouble(input, result) + case d: DecimalType => genHashDecimal(ctx, d, input, result) + case CalendarIntervalType => genHashCalendarInterval(input, result) + case BinaryType => genHashBytes(input, result) + case StringType => genHashString(input, result) + case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull) + case MapType(kt, vt, valueContainsNull) => + genHashForMap(ctx, input, result, kt, vt, valueContainsNull) + case StructType(fields) => genHashForStruct(ctx, input, result, fields) + case udt: UserDefinedType[_] => computeHashWithTailRec(input, udt.sqlType, result, ctx) } + protected def computeHash( + input: String, + dataType: DataType, + result: String, + ctx: CodegenContext): String = computeHashWithTailRec(input, dataType, result, ctx) + protected def hasherClassName: String } @@ -568,3 +619,217 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { override def foldable: Boolean = true override def nullable: Boolean = false } + +/** + * Simulates Hive's hashing function at + * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive + * + * We should use this hash function for both shuffle and bucket of Hive tables, so that + * we can guarantee shuffle and bucketing have same data distribution + * + * TODO: Support Decimal and date related types + */ +@ExpressionDescription( + usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.") +case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { + override val seed = 0 + + override def dataType: DataType = IntegerType + + override def prettyName: String = "hive-hash" + + override protected def hasherClassName: String = classOf[HiveHasher].getName + + override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = { + HiveHashFunction.hash(value, dataType, seed).toInt + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + ev.isNull = "false" + val childHash = ctx.freshName("childHash") + val childrenHash = children.map { child => + val childGen = child.genCode(ctx) + childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { + computeHash(childGen.value, child.dataType, childHash, ctx) + } + s"${ev.value} = (31 * ${ev.value}) + $childHash;" + }.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "") + + ev.copy(code = s""" + ${ctx.javaType(dataType)} ${ev.value} = $seed; + $childrenHash""") + } + + override def eval(input: InternalRow): Int = { + var hash = seed + var i = 0 + val len = children.length + while (i < len) { + hash = (31 * hash) + computeHash(children(i).eval(input), children(i).dataType, hash) + i += 1 + } + hash + } + + override protected def genHashInt(i: String, result: String): String = + s"$result = $hasherClassName.hashInt($i);" + + override protected def genHashLong(l: String, result: String): String = + s"$result = $hasherClassName.hashLong($l);" + + override protected def genHashBytes(b: String, result: String): String = + s"$result = $hasherClassName.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length);" + + override protected def genHashCalendarInterval(input: String, result: String): String = { + s""" + $result = (31 * $hasherClassName.hashInt($input.months)) + + $hasherClassName.hashLong($input.microseconds);" + """ + } + + override protected def genHashString(input: String, result: String): String = { + val baseObject = s"$input.getBaseObject()" + val baseOffset = s"$input.getBaseOffset()" + val numBytes = s"$input.numBytes()" + s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);" + } + + override protected def genHashForArray( + ctx: CodegenContext, + input: String, + result: String, + elementType: DataType, + containsNull: Boolean): String = { + val index = ctx.freshName("index") + val childResult = ctx.freshName("childResult") + s""" + int $childResult = 0; + for (int $index = 0; $index < $input.numElements(); $index++) { + $childResult = 0; + ${nullSafeElementHash(input, index, containsNull, elementType, childResult, ctx)}; + $result = (31 * $result) + $childResult; + } + """ + } + + override protected def genHashForMap( + ctx: CodegenContext, + input: String, + result: String, + keyType: DataType, + valueType: DataType, + valueContainsNull: Boolean): String = { + val index = ctx.freshName("index") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + val keyResult = ctx.freshName("keyResult") + val valueResult = ctx.freshName("valueResult") + s""" + final ArrayData $keys = $input.keyArray(); + final ArrayData $values = $input.valueArray(); + int $keyResult = 0; + int $valueResult = 0; + for (int $index = 0; $index < $input.numElements(); $index++) { + $keyResult = 0; + ${nullSafeElementHash(keys, index, false, keyType, keyResult, ctx)} + $valueResult = 0; + ${nullSafeElementHash(values, index, valueContainsNull, valueType, valueResult, ctx)} + $result += $keyResult ^ $valueResult; + } + """ + } + + override protected def genHashForStruct( + ctx: CodegenContext, + input: String, + result: String, + fields: Array[StructField]): String = { + val localResult = ctx.freshName("localResult") + val childResult = ctx.freshName("childResult") + fields.zipWithIndex.map { case (field, index) => + s""" + $childResult = 0; + ${nullSafeElementHash(input, index.toString, field.nullable, field.dataType, + childResult, ctx)} + $localResult = (31 * $localResult) + $childResult; + """ + }.mkString( + s""" + int $localResult = 0; + int $childResult = 0; + """, + "", + s"$result = (31 * $result) + $localResult;" + ) + } +} + +object HiveHashFunction extends InterpretedHashFunction { + override protected def hashInt(i: Int, seed: Long): Long = { + HiveHasher.hashInt(i) + } + + override protected def hashLong(l: Long, seed: Long): Long = { + HiveHasher.hashLong(l) + } + + override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { + HiveHasher.hashUnsafeBytes(base, offset, len) + } + + override def hash(value: Any, dataType: DataType, seed: Long): Long = { + value match { + case null => 0 + case array: ArrayData => + val elementType = dataType match { + case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType + case ArrayType(et, _) => et + } + + var result = 0 + var i = 0 + val length = array.numElements() + while (i < length) { + result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt + i += 1 + } + result + + case map: MapData => + val (kt, vt) = dataType match { + case udt: UserDefinedType[_] => + val mapType = udt.sqlType.asInstanceOf[MapType] + mapType.keyType -> mapType.valueType + case MapType(_kt, _vt, _) => _kt -> _vt + } + val keys = map.keyArray() + val values = map.valueArray() + + var result = 0 + var i = 0 + val length = map.numElements() + while (i < length) { + result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt + i += 1 + } + result + + case struct: InternalRow => + val types: Array[DataType] = dataType match { + case udt: UserDefinedType[_] => + udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray + case StructType(fields) => fields.map(_.dataType) + } + + var result = 0 + var i = 0 + val length = struct.numFields + while (i < length) { + result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed + 1).toInt + i += 1 + } + result + + case _ => super.hash(value, dataType, seed) + } + } +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java new file mode 100644 index 0000000000..67a5eb0c7f --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +public class HiveHasherSuite { + private final static HiveHasher hasher = new HiveHasher(); + + @Test + public void testKnownIntegerInputs() { + int[] inputs = {0, Integer.MIN_VALUE, Integer.MAX_VALUE, 593689054, -189366624}; + for (int input : inputs) { + Assert.assertEquals(input, HiveHasher.hashInt(input)); + } + } + + @Test + public void testKnownLongInputs() { + Assert.assertEquals(0, HiveHasher.hashLong(0L)); + Assert.assertEquals(41, HiveHasher.hashLong(-42L)); + Assert.assertEquals(42, HiveHasher.hashLong(42L)); + Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MIN_VALUE)); + Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MAX_VALUE)); + } + + @Test + public void testKnownStringAndIntInputs() { + int[] inputs = {84, 19, 8}; + int[] expected = {-823832826, -823835053, 111972242}; + + for (int i = 0; i < inputs.length; i++) { + UTF8String s = UTF8String.fromString("val_" + inputs[i]); + int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes()); + Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash)); + } + } + + @Test + public void randomizedStressTest() { + int size = 65536; + Random rand = new Random(); + + // A set used to track collision rate. + Set<Integer> hashcodes = new HashSet<>(); + for (int i = 0; i < size; i++) { + int vint = rand.nextInt(); + long lint = rand.nextLong(); + Assert.assertEquals(HiveHasher.hashInt(vint), HiveHasher.hashInt(vint)); + Assert.assertEquals(HiveHasher.hashLong(lint), HiveHasher.hashLong(lint)); + + hashcodes.add(HiveHasher.hashLong(lint)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } + + @Test + public void randomizedStressTestBytes() { + int size = 65536; + Random rand = new Random(); + + // A set used to track collision rate. + Set<Integer> hashcodes = new HashSet<>(); + for (int i = 0; i < size; i++) { + int byteArrSize = rand.nextInt(100) * 8; + byte[] bytes = new byte[byteArrSize]; + rand.nextBytes(bytes); + + Assert.assertEquals( + HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + + hashcodes.add(HiveHasher.hashUnsafeBytes( + bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } + + @Test + public void randomizedStressTestPaddedStrings() { + int size = 64000; + // A set used to track collision rate. + Set<Integer> hashcodes = new HashSet<>(); + for (int i = 0; i < size; i++) { + int byteArrSize = 8; + byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8); + byte[] paddedBytes = new byte[byteArrSize]; + System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length); + + Assert.assertEquals( + HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + + hashcodes.add(HiveHasher.hashUnsafeBytes( + paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala index c6a1a2be0d..2d94b66a1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala @@ -42,8 +42,8 @@ object HashBenchmark { val benchmark = new Benchmark("Hash For " + name, iters * numRows) benchmark.addCase("interpreted version") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += rows(i).hashCode() @@ -54,8 +54,8 @@ object HashBenchmark { val getHashCode = UnsafeProjection.create(new Murmur3Hash(attrs) :: Nil, attrs) benchmark.addCase("codegen version") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += getHashCode(rows(i)).getInt(0) @@ -66,8 +66,8 @@ object HashBenchmark { val getHashCode64b = UnsafeProjection.create(new XxHash64(attrs) :: Nil, attrs) benchmark.addCase("codegen version 64-bit") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += getHashCode64b(rows(i)).getInt(0) @@ -76,30 +76,44 @@ object HashBenchmark { } } + val getHiveHashCode = UnsafeProjection.create(new HiveHash(attrs) :: Nil, attrs) + benchmark.addCase("codegen HiveHash version") { _: Int => + var sum = 0 + for (_ <- 0L until iters) { + var i = 0 + while (i < numRows) { + sum += getHiveHashCode(rows(i)).getInt(0) + i += 1 + } + } + } + benchmark.run() } def main(args: Array[String]): Unit = { val singleInt = new StructType().add("i", IntegerType) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1006 / 1011 133.4 7.5 1.0X - codegen version 1835 / 1839 73.1 13.7 0.5X - codegen version 64-bit 1627 / 1628 82.5 12.1 0.6X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 3262 / 3267 164.6 6.1 1.0X + codegen version 6448 / 6718 83.3 12.0 0.5X + codegen version 64-bit 6088 / 6154 88.2 11.3 0.5X + codegen HiveHash version 4732 / 4745 113.5 8.8 0.7X + */ test("single ints", singleInt, 1 << 15, 1 << 14) val singleLong = new StructType().add("i", LongType) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1196 / 1209 112.2 8.9 1.0X - codegen version 2178 / 2181 61.6 16.2 0.5X - codegen version 64-bit 1752 / 1753 76.6 13.1 0.7X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 3716 / 3726 144.5 6.9 1.0X + codegen version 7706 / 7732 69.7 14.4 0.5X + codegen version 64-bit 6370 / 6399 84.3 11.9 0.6X + codegen HiveHash version 4924 / 5026 109.0 9.2 0.8X + */ test("single longs", singleLong, 1 << 15, 1 << 14) val normal = new StructType() @@ -118,13 +132,14 @@ object HashBenchmark { .add("date", DateType) .add("timestamp", TimestampType) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 2713 / 2715 0.8 1293.5 1.0X - codegen version 2015 / 2018 1.0 960.9 1.3X - codegen version 64-bit 735 / 738 2.9 350.7 3.7X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 2985 / 3013 0.7 1423.4 1.0X + codegen version 2422 / 2434 0.9 1155.1 1.2X + codegen version 64-bit 856 / 920 2.5 408.0 3.5X + codegen HiveHash version 4501 / 4979 0.5 2146.4 0.7X + */ test("normal", normal, 1 << 10, 1 << 11) val arrayOfInt = ArrayType(IntegerType) @@ -132,13 +147,14 @@ object HashBenchmark { .add("array", arrayOfInt) .add("arrayOfArray", ArrayType(arrayOfInt)) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1498 / 1499 0.1 11432.1 1.0X - codegen version 2642 / 2643 0.0 20158.4 0.6X - codegen version 64-bit 2421 / 2424 0.1 18472.5 0.6X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 3100 / 3555 0.0 23651.8 1.0X + codegen version 5779 / 5865 0.0 44088.4 0.5X + codegen version 64-bit 4738 / 4821 0.0 36151.7 0.7X + codegen HiveHash version 2200 / 2246 0.1 16785.9 1.4X + */ test("array", array, 1 << 8, 1 << 9) val mapOfInt = MapType(IntegerType, IntegerType) @@ -146,13 +162,14 @@ object HashBenchmark { .add("map", mapOfInt) .add("mapOfMap", MapType(IntegerType, mapOfInt)) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1612 / 1618 0.0 393553.4 1.0X - codegen version 149 / 150 0.0 36381.2 10.8X - codegen version 64-bit 144 / 145 0.0 35122.1 11.2X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 0 / 0 48.1 20.8 1.0X + codegen version 257 / 275 0.0 62768.7 0.0X + codegen version 64-bit 226 / 240 0.0 55224.5 0.0X + codegen HiveHash version 89 / 96 0.0 21708.8 0.0X + */ test("map", map, 1 << 6, 1 << 6) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala index 53f21a8442..2a753a0c84 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.util.Random -import org.apache.spark.sql.catalyst.expressions.XXH64 +import org.apache.spark.sql.catalyst.expressions.{HiveHasher, XXH64} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 import org.apache.spark.util.Benchmark @@ -38,8 +38,8 @@ object HashByteArrayBenchmark { val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays) benchmark.addCase("Murmur3_x86_32") { _: Int => + var sum = 0L for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numArrays) { sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42) @@ -49,8 +49,8 @@ object HashByteArrayBenchmark { } benchmark.addCase("xxHash 64-bit") { _: Int => + var sum = 0L for (_ <- 0L until iters) { - var sum = 0L var i = 0 while (i < numArrays) { sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42) @@ -59,90 +59,110 @@ object HashByteArrayBenchmark { } } + benchmark.addCase("HiveHasher") { _: Int => + var sum = 0L + for (_ <- 0L until iters) { + var i = 0 + while (i < numArrays) { + sum += HiveHasher.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length) + i += 1 + } + } + } + benchmark.run() } def main(args: Array[String]): Unit = { /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 11 / 12 185.1 5.4 1.0X - xxHash 64-bit 17 / 18 120.0 8.3 0.6X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 12 / 16 174.3 5.7 1.0X + xxHash 64-bit 17 / 22 120.0 8.3 0.7X + HiveHasher 13 / 15 162.1 6.2 0.9X */ test(8, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 18 / 18 118.6 8.4 1.0X - xxHash 64-bit 20 / 21 102.5 9.8 0.9X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 19 / 22 107.6 9.3 1.0X + xxHash 64-bit 20 / 24 104.6 9.6 1.0X + HiveHasher 24 / 28 87.0 11.5 0.8X */ test(16, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 24 / 24 86.6 11.5 1.0X - xxHash 64-bit 23 / 23 93.2 10.7 1.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 28 / 32 74.8 13.4 1.0X + xxHash 64-bit 24 / 29 87.3 11.5 1.2X + HiveHasher 36 / 41 57.7 17.3 0.8X */ test(24, 42L, 1 << 10, 1 << 11) // Add 31 to all arrays to create worse case alignment for xxHash. /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 38 / 39 54.7 18.3 1.0X - xxHash 64-bit 33 / 33 64.4 15.5 1.2X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 41 / 45 51.1 19.6 1.0X + xxHash 64-bit 36 / 44 58.8 17.0 1.2X + HiveHasher 49 / 54 42.6 23.5 0.8X */ test(31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 91 / 94 22.9 43.6 1.0X - xxHash 64-bit 68 / 69 30.6 32.7 1.3X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 100 / 110 21.0 47.7 1.0X + xxHash 64-bit 74 / 78 28.2 35.5 1.3X + HiveHasher 189 / 196 11.1 90.3 0.5X */ test(64 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 268 / 268 7.8 127.6 1.0X - xxHash 64-bit 108 / 109 19.4 51.6 2.5X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 299 / 311 7.0 142.4 1.0X + xxHash 64-bit 113 / 122 18.5 54.1 2.6X + HiveHasher 620 / 624 3.4 295.5 0.5X */ test(256 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 942 / 945 2.2 449.4 1.0X - xxHash 64-bit 276 / 276 7.6 131.4 3.4X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 1068 / 1070 2.0 509.1 1.0X + xxHash 64-bit 306 / 315 6.9 145.9 3.5X + HiveHasher 2316 / 2369 0.9 1104.3 0.5X */ test(1024 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 1839 / 1843 1.1 876.8 1.0X - xxHash 64-bit 445 / 448 4.7 212.1 4.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 2252 / 2274 0.9 1074.1 1.0X + xxHash 64-bit 534 / 580 3.9 254.6 4.2X + HiveHasher 4739 / 4786 0.4 2259.8 0.5X */ test(2048 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 7307 / 7310 0.3 3484.4 1.0X - xxHash 64-bit 1487 / 1488 1.4 709.1 4.9X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 9249 / 9586 0.2 4410.5 1.0X + xxHash 64-bit 2897 / 3241 0.7 1381.6 3.2X + HiveHasher 19392 / 20211 0.1 9246.6 0.5X + */ test(8192 + 31, 42L, 1 << 10, 1 << 11) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala index 33916c0891..13ce588462 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala @@ -145,7 +145,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get val encoder = RowEncoder(inputSchema) val seed = scala.util.Random.nextInt() - test(s"murmur3/xxHash64 hash: ${inputSchema.simpleString}") { + test(s"murmur3/xxHash64/hive hash: ${inputSchema.simpleString}") { for (_ <- 1 to 10) { val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow] val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map { @@ -154,6 +154,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { // Only test the interpreted version has same result with codegen version. checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval()) checkEvaluation(XxHash64(literals, seed), XxHash64(literals, seed).eval()) + checkEvaluation(HiveHash(literals), HiveHash(literals).eval()) } } } |