diff options
author | Tejas Patil <tejasp@fb.com> | 2017-03-12 20:08:44 -0700 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-03-12 20:08:44 -0700 |
commit | 9456688547522a62f1e7520e9b3564550c57aa5d (patch) | |
tree | 6c4411d2b0edc9ac54dbc00e9e657e73d1265eaa | |
parent | 0a4d06a7c3db9fec2b6f050a631e8b59b0e9376e (diff) | |
download | spark-9456688547522a62f1e7520e9b3564550c57aa5d.tar.gz spark-9456688547522a62f1e7520e9b3564550c57aa5d.tar.bz2 spark-9456688547522a62f1e7520e9b3564550c57aa5d.zip |
[SPARK-17495][SQL] Support date, timestamp and interval types in Hive hash
## What changes were proposed in this pull request?
- Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive
- Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc.
- Date type was already supported. This PR adds test for that.
## How was this patch tested?
Added unit tests
Author: Tejas Patil <tejasp@fb.com>
Closes #17062 from tejasapatil/SPARK-17495_time_related_types.
2 files changed, 268 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 03101b4bfc..2a5963d37f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -335,6 +335,8 @@ abstract class HashExpression[E] extends Expression { } } + protected def genHashTimestamp(t: String, result: String): String = genHashLong(t, result) + protected def genHashCalendarInterval(input: String, result: String): String = { val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)" s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);" @@ -400,7 +402,8 @@ abstract class HashExpression[E] extends Expression { case NullType => "" case BooleanType => genHashBoolean(input, result) case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result) - case LongType | TimestampType => genHashLong(input, result) + case LongType => genHashLong(input, result) + case TimestampType => genHashTimestamp(input, result) case FloatType => genHashFloat(input, result) case DoubleType => genHashDouble(input, result) case d: DecimalType => genHashDecimal(ctx, d, input, result) @@ -433,6 +436,10 @@ abstract class InterpretedHashFunction { protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long + /** + * Computes hash of a given `value` of type `dataType`. The caller needs to check the validity + * of input `value`. + */ def hash(value: Any, dataType: DataType, seed: Long): Long = { value match { case null => seed @@ -580,8 +587,6 @@ object XxHash64Function extends InterpretedHashFunction { * * We should use this hash function for both shuffle and bucket of Hive tables, so that * we can guarantee shuffle and bucketing have same data distribution - * - * TODO: Support date related types */ @ExpressionDescription( usage = "_FUNC_(expr1, expr2, ...) - Returns a hash value of the arguments.") @@ -648,11 +653,16 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { override protected def genHashCalendarInterval(input: String, result: String): String = { s""" - $result = (31 * $hasherClassName.hashInt($input.months)) + - $hasherClassName.hashLong($input.microseconds);" + $result = (int) + ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashCalendarInterval($input); """ } + override protected def genHashTimestamp(input: String, result: String): String = + s""" + $result = (int) ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input); + """ + override protected def genHashString(input: String, result: String): String = { val baseObject = s"$input.getBaseObject()" val baseOffset = s"$input.getBaseOffset()" @@ -781,6 +791,49 @@ object HiveHashFunction extends InterpretedHashFunction { result } + /** + * Mimics TimestampWritable.hashCode() in Hive + */ + def hashTimestamp(timestamp: Long): Long = { + val timestampInSeconds = timestamp / 1000000 + val nanoSecondsPortion = (timestamp % 1000000) * 1000 + + var result = timestampInSeconds + result <<= 30 // the nanosecond part fits in 30 bits + result |= nanoSecondsPortion + ((result >>> 32) ^ result).toInt + } + + /** + * Hive allows input intervals to be defined using units below but the intervals + * have to be from the same category: + * - year, month (stored as HiveIntervalYearMonth) + * - day, hour, minute, second, nanosecond (stored as HiveIntervalDayTime) + * + * eg. (INTERVAL '30' YEAR + INTERVAL '-23' DAY) fails in Hive + * + * This method mimics HiveIntervalDayTime.hashCode() in Hive. + * + * Two differences wrt Hive due to how intervals are stored in Spark vs Hive: + * + * - If the `INTERVAL` is backed as HiveIntervalYearMonth in Hive, then this method will not + * produce Hive compatible result. The reason being Spark's representation of calendar does not + * have such categories based on the interval and is unified. + * + * - Spark's [[CalendarInterval]] has precision upto microseconds but Hive's + * HiveIntervalDayTime can store data with precision upto nanoseconds. So, any input intervals + * with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output) + */ + def hashCalendarInterval(calendarInterval: CalendarInterval): Long = { + val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt + val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt + + val nanoSeconds = + (calendarInterval.microseconds - + (totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000 + (result * 37) + nanoSeconds + } + override def hash(value: Any, dataType: DataType, seed: Long): Long = { value match { case null => 0 @@ -834,10 +887,10 @@ object HiveHashFunction extends InterpretedHashFunction { } result - case d: Decimal => - normalizeDecimal(d.toJavaBigDecimal).hashCode() - - case _ => super.hash(value, dataType, seed) + case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode() + case timestamp: Long if dataType.isInstanceOf[TimestampType] => hashTimestamp(timestamp) + case calendarInterval: CalendarInterval => hashCalendarInterval(calendarInterval) + case _ => super.hash(value, dataType, 0) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index 0c77dc2709..59fc8eaf73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -18,18 +18,20 @@ package org.apache.spark.sql.catalyst.expressions import java.nio.charset.StandardCharsets +import java.util.TimeZone import scala.collection.mutable.ArrayBuffer import org.apache.commons.codec.digest.DigestUtils +import org.scalatest.exceptions.TestFailedException import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.types.{ArrayType, StructType, _} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val random = new scala.util.Random @@ -168,6 +170,208 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { // scalastyle:on nonascii } + test("hive-hash for date type") { + def checkHiveHashForDateType(dateString: String, expected: Long): Unit = { + checkHiveHash( + DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get, + DateType, + expected) + } + + // basic case + checkHiveHashForDateType("2017-01-01", 17167) + + // boundary cases + checkHiveHashForDateType("0000-01-01", -719530) + checkHiveHashForDateType("9999-12-31", 2932896) + + // epoch + checkHiveHashForDateType("1970-01-01", 0) + + // before epoch + checkHiveHashForDateType("1800-01-01", -62091) + + // Invalid input: bad date string. Hive returns 0 for such cases + intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0)) + intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0)) + intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0)) + + // Invalid input: Empty string. Hive returns 0 for this case + intercept[NoSuchElementException](checkHiveHashForDateType("", 0)) + + // Invalid input: February 30th for a leap year. Hive supports this but Spark doesn't + intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861)) + } + + test("hive-hash for timestamp type") { + def checkHiveHashForTimestampType( + timestamp: String, + expected: Long, + timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = { + checkHiveHash( + DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), timeZone).get, + TimestampType, + expected) + } + + // basic case + checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271) + + // with higher precision + checkHiveHashForTimestampType("2017-02-24 10:56:29.111111", 1353936655) + + // with different timezone + checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471, + TimeZone.getTimeZone("US/Pacific")) + + // boundary cases + checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784) + checkHiveHashForTimestampType("9999-01-01 00:00:00", -1081818240) + + // epoch + checkHiveHashForTimestampType("1970-01-01 00:00:00", 0) + + // before epoch + checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885) + + // Invalid input: bad timestamp string. Hive returns 0 for such cases + intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 0:0:0", 0)) + intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 99:99:45", 0)) + intercept[NoSuchElementException](checkHiveHashForTimestampType("555555-55555-5555", 0)) + + // Invalid input: Empty string. Hive returns 0 for this case + intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0)) + + // Invalid input: February 30th is a leap year. Hive supports this but Spark doesn't + intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 00:00:00", 0)) + + // Invalid input: Hive accepts upto 9 decimal place precision but Spark uses upto 6 + intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 10:56:29.11111111", 0)) + } + + test("hive-hash for CalendarInterval type") { + def checkHiveHashForIntervalType(interval: String, expected: Long): Unit = { + checkHiveHash(CalendarInterval.fromString(interval), CalendarIntervalType, expected) + } + + // ----- MICROSEC ----- + + // basic case + checkHiveHashForIntervalType("interval 1 microsecond", 24273) + + // negative + checkHiveHashForIntervalType("interval -1 microsecond", 22273) + + // edge / boundary cases + checkHiveHashForIntervalType("interval 0 microsecond", 23273) + checkHiveHashForIntervalType("interval 999 microsecond", 1022273) + checkHiveHashForIntervalType("interval -999 microsecond", -975727) + + // ----- MILLISEC ----- + + // basic case + checkHiveHashForIntervalType("interval 1 millisecond", 1023273) + + // negative + checkHiveHashForIntervalType("interval -1 millisecond", -976727) + + // edge / boundary cases + checkHiveHashForIntervalType("interval 0 millisecond", 23273) + checkHiveHashForIntervalType("interval 999 millisecond", 999023273) + checkHiveHashForIntervalType("interval -999 millisecond", -998976727) + + // ----- SECOND ----- + + // basic case + checkHiveHashForIntervalType("interval 1 second", 23310) + + // negative + checkHiveHashForIntervalType("interval -1 second", 23273) + + // edge / boundary cases + checkHiveHashForIntervalType("interval 0 second", 23273) + checkHiveHashForIntervalType("interval 2147483647 second", -2147460412) + checkHiveHashForIntervalType("interval -2147483648 second", -2147460412) + + // Out of range for both Hive and Spark + // Hive throws an exception. Spark overflows and returns wrong output + // checkHiveHashForIntervalType("interval 9999999999 second", 0) + + // ----- MINUTE ----- + + // basic cases + checkHiveHashForIntervalType("interval 1 minute", 25493) + + // negative + checkHiveHashForIntervalType("interval -1 minute", 25456) + + // edge / boundary cases + checkHiveHashForIntervalType("interval 0 minute", 23273) + checkHiveHashForIntervalType("interval 2147483647 minute", 21830) + checkHiveHashForIntervalType("interval -2147483648 minute", 22163) + + // Out of range for both Hive and Spark + // Hive throws an exception. Spark overflows and returns wrong output + // checkHiveHashForIntervalType("interval 9999999999 minute", 0) + + // ----- HOUR ----- + + // basic case + checkHiveHashForIntervalType("interval 1 hour", 156473) + + // negative + checkHiveHashForIntervalType("interval -1 hour", 156436) + + // edge / boundary cases + checkHiveHashForIntervalType("interval 0 hour", 23273) + checkHiveHashForIntervalType("interval 2147483647 hour", -62308) + checkHiveHashForIntervalType("interval -2147483648 hour", -43327) + + // Out of range for both Hive and Spark + // Hive throws an exception. Spark overflows and returns wrong output + // checkHiveHashForIntervalType("interval 9999999999 hour", 0) + + // ----- DAY ----- + + // basic cases + checkHiveHashForIntervalType("interval 1 day", 3220073) + + // negative + checkHiveHashForIntervalType("interval -1 day", 3220036) + + // edge / boundary cases + checkHiveHashForIntervalType("interval 0 day", 23273) + checkHiveHashForIntervalType("interval 106751991 day", -451506760) + checkHiveHashForIntervalType("interval -106751991 day", -451514123) + + // Hive supports `day` for a longer range but Spark's range is smaller + // The check for range is done at the parser level so this does not fail in Spark + // checkHiveHashForIntervalType("interval -2147483648 day", -1575127) + // checkHiveHashForIntervalType("interval 2147483647 day", -4767228) + + // Out of range for both Hive and Spark + // Hive throws an exception. Spark overflows and returns wrong output + // checkHiveHashForIntervalType("interval 9999999999 day", 0) + + // ----- MIX ----- + + checkHiveHashForIntervalType("interval 0 day 0 hour", 23273) + checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute", 23273) + checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second", 23273) + checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second 0 millisecond", 23273) + checkHiveHashForIntervalType( + "interval 0 day 0 hour 0 minute 0 second 0 millisecond 0 microsecond", 23273) + + checkHiveHashForIntervalType("interval 6 day 15 hour", 21202073) + checkHiveHashForIntervalType("interval 5 day 4 hour 8 minute", 16557833) + checkHiveHashForIntervalType("interval -23 day 56 hour -1111113 minute 9898989 second", + -2128468593) + checkHiveHashForIntervalType("interval 66 day 12 hour 39 minute 23 second 987 millisecond", + 1199697904) + checkHiveHashForIntervalType( + "interval 66 day 12 hour 39 minute 23 second 987 millisecond 123 microsecond", 1199820904) + } + test("hive-hash for array") { // empty array checkHiveHash( |