aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2017-03-12 20:08:44 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-12 20:08:44 -0700
commit9456688547522a62f1e7520e9b3564550c57aa5d (patch)
tree6c4411d2b0edc9ac54dbc00e9e657e73d1265eaa /sql
parent0a4d06a7c3db9fec2b6f050a631e8b59b0e9376e (diff)
downloadspark-9456688547522a62f1e7520e9b3564550c57aa5d.tar.gz
spark-9456688547522a62f1e7520e9b3564550c57aa5d.tar.bz2
spark-9456688547522a62f1e7520e9b3564550c57aa5d.zip
[SPARK-17495][SQL] Support date, timestamp and interval types in Hive hash
## What changes were proposed in this pull request? - Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive - Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc. - Date type was already supported. This PR adds test for that. ## How was this patch tested? Added unit tests Author: Tejas Patil <tejasp@fb.com> Closes #17062 from tejasapatil/SPARK-17495_time_related_types.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala71
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala208
2 files changed, 268 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 03101b4bfc..2a5963d37f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -335,6 +335,8 @@ abstract class HashExpression[E] extends Expression {
}
}
+ protected def genHashTimestamp(t: String, result: String): String = genHashLong(t, result)
+
protected def genHashCalendarInterval(input: String, result: String): String = {
val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
@@ -400,7 +402,8 @@ abstract class HashExpression[E] extends Expression {
case NullType => ""
case BooleanType => genHashBoolean(input, result)
case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
- case LongType | TimestampType => genHashLong(input, result)
+ case LongType => genHashLong(input, result)
+ case TimestampType => genHashTimestamp(input, result)
case FloatType => genHashFloat(input, result)
case DoubleType => genHashDouble(input, result)
case d: DecimalType => genHashDecimal(ctx, d, input, result)
@@ -433,6 +436,10 @@ abstract class InterpretedHashFunction {
protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
+ /**
+ * Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
+ * of input `value`.
+ */
def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => seed
@@ -580,8 +587,6 @@ object XxHash64Function extends InterpretedHashFunction {
*
* We should use this hash function for both shuffle and bucket of Hive tables, so that
* we can guarantee shuffle and bucketing have same data distribution
- *
- * TODO: Support date related types
*/
@ExpressionDescription(
usage = "_FUNC_(expr1, expr2, ...) - Returns a hash value of the arguments.")
@@ -648,11 +653,16 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
override protected def genHashCalendarInterval(input: String, result: String): String = {
s"""
- $result = (31 * $hasherClassName.hashInt($input.months)) +
- $hasherClassName.hashLong($input.microseconds);"
+ $result = (int)
+ ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashCalendarInterval($input);
"""
}
+ override protected def genHashTimestamp(input: String, result: String): String =
+ s"""
+ $result = (int) ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input);
+ """
+
override protected def genHashString(input: String, result: String): String = {
val baseObject = s"$input.getBaseObject()"
val baseOffset = s"$input.getBaseOffset()"
@@ -781,6 +791,49 @@ object HiveHashFunction extends InterpretedHashFunction {
result
}
+ /**
+ * Mimics TimestampWritable.hashCode() in Hive
+ */
+ def hashTimestamp(timestamp: Long): Long = {
+ val timestampInSeconds = timestamp / 1000000
+ val nanoSecondsPortion = (timestamp % 1000000) * 1000
+
+ var result = timestampInSeconds
+ result <<= 30 // the nanosecond part fits in 30 bits
+ result |= nanoSecondsPortion
+ ((result >>> 32) ^ result).toInt
+ }
+
+ /**
+ * Hive allows input intervals to be defined using units below but the intervals
+ * have to be from the same category:
+ * - year, month (stored as HiveIntervalYearMonth)
+ * - day, hour, minute, second, nanosecond (stored as HiveIntervalDayTime)
+ *
+ * eg. (INTERVAL '30' YEAR + INTERVAL '-23' DAY) fails in Hive
+ *
+ * This method mimics HiveIntervalDayTime.hashCode() in Hive.
+ *
+ * Two differences wrt Hive due to how intervals are stored in Spark vs Hive:
+ *
+ * - If the `INTERVAL` is backed as HiveIntervalYearMonth in Hive, then this method will not
+ * produce Hive compatible result. The reason being Spark's representation of calendar does not
+ * have such categories based on the interval and is unified.
+ *
+ * - Spark's [[CalendarInterval]] has precision upto microseconds but Hive's
+ * HiveIntervalDayTime can store data with precision upto nanoseconds. So, any input intervals
+ * with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
+ */
+ def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
+ val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt
+ val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt
+
+ val nanoSeconds =
+ (calendarInterval.microseconds -
+ (totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
+ (result * 37) + nanoSeconds
+ }
+
override def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => 0
@@ -834,10 +887,10 @@ object HiveHashFunction extends InterpretedHashFunction {
}
result
- case d: Decimal =>
- normalizeDecimal(d.toJavaBigDecimal).hashCode()
-
- case _ => super.hash(value, dataType, seed)
+ case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode()
+ case timestamp: Long if dataType.isInstanceOf[TimestampType] => hashTimestamp(timestamp)
+ case calendarInterval: CalendarInterval => hashCalendarInterval(calendarInterval)
+ case _ => super.hash(value, dataType, 0)
}
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index 0c77dc2709..59fc8eaf73 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -18,18 +18,20 @@
package org.apache.spark.sql.catalyst.expressions
import java.nio.charset.StandardCharsets
+import java.util.TimeZone
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.codec.digest.DigestUtils
+import org.scalatest.exceptions.TestFailedException
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types.{ArrayType, StructType, _}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val random = new scala.util.Random
@@ -168,6 +170,208 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:on nonascii
}
+ test("hive-hash for date type") {
+ def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
+ checkHiveHash(
+ DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+ DateType,
+ expected)
+ }
+
+ // basic case
+ checkHiveHashForDateType("2017-01-01", 17167)
+
+ // boundary cases
+ checkHiveHashForDateType("0000-01-01", -719530)
+ checkHiveHashForDateType("9999-12-31", 2932896)
+
+ // epoch
+ checkHiveHashForDateType("1970-01-01", 0)
+
+ // before epoch
+ checkHiveHashForDateType("1800-01-01", -62091)
+
+ // Invalid input: bad date string. Hive returns 0 for such cases
+ intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
+ intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
+ intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
+
+ // Invalid input: Empty string. Hive returns 0 for this case
+ intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
+
+ // Invalid input: February 30th for a leap year. Hive supports this but Spark doesn't
+ intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
+ }
+
+ test("hive-hash for timestamp type") {
+ def checkHiveHashForTimestampType(
+ timestamp: String,
+ expected: Long,
+ timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
+ checkHiveHash(
+ DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), timeZone).get,
+ TimestampType,
+ expected)
+ }
+
+ // basic case
+ checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
+
+ // with higher precision
+ checkHiveHashForTimestampType("2017-02-24 10:56:29.111111", 1353936655)
+
+ // with different timezone
+ checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
+ TimeZone.getTimeZone("US/Pacific"))
+
+ // boundary cases
+ checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
+ checkHiveHashForTimestampType("9999-01-01 00:00:00", -1081818240)
+
+ // epoch
+ checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)
+
+ // before epoch
+ checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)
+
+ // Invalid input: bad timestamp string. Hive returns 0 for such cases
+ intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 0:0:0", 0))
+ intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 99:99:45", 0))
+ intercept[NoSuchElementException](checkHiveHashForTimestampType("555555-55555-5555", 0))
+
+ // Invalid input: Empty string. Hive returns 0 for this case
+ intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))
+
+ // Invalid input: February 30th is a leap year. Hive supports this but Spark doesn't
+ intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 00:00:00", 0))
+
+ // Invalid input: Hive accepts upto 9 decimal place precision but Spark uses upto 6
+ intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 10:56:29.11111111", 0))
+ }
+
+ test("hive-hash for CalendarInterval type") {
+ def checkHiveHashForIntervalType(interval: String, expected: Long): Unit = {
+ checkHiveHash(CalendarInterval.fromString(interval), CalendarIntervalType, expected)
+ }
+
+ // ----- MICROSEC -----
+
+ // basic case
+ checkHiveHashForIntervalType("interval 1 microsecond", 24273)
+
+ // negative
+ checkHiveHashForIntervalType("interval -1 microsecond", 22273)
+
+ // edge / boundary cases
+ checkHiveHashForIntervalType("interval 0 microsecond", 23273)
+ checkHiveHashForIntervalType("interval 999 microsecond", 1022273)
+ checkHiveHashForIntervalType("interval -999 microsecond", -975727)
+
+ // ----- MILLISEC -----
+
+ // basic case
+ checkHiveHashForIntervalType("interval 1 millisecond", 1023273)
+
+ // negative
+ checkHiveHashForIntervalType("interval -1 millisecond", -976727)
+
+ // edge / boundary cases
+ checkHiveHashForIntervalType("interval 0 millisecond", 23273)
+ checkHiveHashForIntervalType("interval 999 millisecond", 999023273)
+ checkHiveHashForIntervalType("interval -999 millisecond", -998976727)
+
+ // ----- SECOND -----
+
+ // basic case
+ checkHiveHashForIntervalType("interval 1 second", 23310)
+
+ // negative
+ checkHiveHashForIntervalType("interval -1 second", 23273)
+
+ // edge / boundary cases
+ checkHiveHashForIntervalType("interval 0 second", 23273)
+ checkHiveHashForIntervalType("interval 2147483647 second", -2147460412)
+ checkHiveHashForIntervalType("interval -2147483648 second", -2147460412)
+
+ // Out of range for both Hive and Spark
+ // Hive throws an exception. Spark overflows and returns wrong output
+ // checkHiveHashForIntervalType("interval 9999999999 second", 0)
+
+ // ----- MINUTE -----
+
+ // basic cases
+ checkHiveHashForIntervalType("interval 1 minute", 25493)
+
+ // negative
+ checkHiveHashForIntervalType("interval -1 minute", 25456)
+
+ // edge / boundary cases
+ checkHiveHashForIntervalType("interval 0 minute", 23273)
+ checkHiveHashForIntervalType("interval 2147483647 minute", 21830)
+ checkHiveHashForIntervalType("interval -2147483648 minute", 22163)
+
+ // Out of range for both Hive and Spark
+ // Hive throws an exception. Spark overflows and returns wrong output
+ // checkHiveHashForIntervalType("interval 9999999999 minute", 0)
+
+ // ----- HOUR -----
+
+ // basic case
+ checkHiveHashForIntervalType("interval 1 hour", 156473)
+
+ // negative
+ checkHiveHashForIntervalType("interval -1 hour", 156436)
+
+ // edge / boundary cases
+ checkHiveHashForIntervalType("interval 0 hour", 23273)
+ checkHiveHashForIntervalType("interval 2147483647 hour", -62308)
+ checkHiveHashForIntervalType("interval -2147483648 hour", -43327)
+
+ // Out of range for both Hive and Spark
+ // Hive throws an exception. Spark overflows and returns wrong output
+ // checkHiveHashForIntervalType("interval 9999999999 hour", 0)
+
+ // ----- DAY -----
+
+ // basic cases
+ checkHiveHashForIntervalType("interval 1 day", 3220073)
+
+ // negative
+ checkHiveHashForIntervalType("interval -1 day", 3220036)
+
+ // edge / boundary cases
+ checkHiveHashForIntervalType("interval 0 day", 23273)
+ checkHiveHashForIntervalType("interval 106751991 day", -451506760)
+ checkHiveHashForIntervalType("interval -106751991 day", -451514123)
+
+ // Hive supports `day` for a longer range but Spark's range is smaller
+ // The check for range is done at the parser level so this does not fail in Spark
+ // checkHiveHashForIntervalType("interval -2147483648 day", -1575127)
+ // checkHiveHashForIntervalType("interval 2147483647 day", -4767228)
+
+ // Out of range for both Hive and Spark
+ // Hive throws an exception. Spark overflows and returns wrong output
+ // checkHiveHashForIntervalType("interval 9999999999 day", 0)
+
+ // ----- MIX -----
+
+ checkHiveHashForIntervalType("interval 0 day 0 hour", 23273)
+ checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute", 23273)
+ checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second", 23273)
+ checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second 0 millisecond", 23273)
+ checkHiveHashForIntervalType(
+ "interval 0 day 0 hour 0 minute 0 second 0 millisecond 0 microsecond", 23273)
+
+ checkHiveHashForIntervalType("interval 6 day 15 hour", 21202073)
+ checkHiveHashForIntervalType("interval 5 day 4 hour 8 minute", 16557833)
+ checkHiveHashForIntervalType("interval -23 day 56 hour -1111113 minute 9898989 second",
+ -2128468593)
+ checkHiveHashForIntervalType("interval 66 day 12 hour 39 minute 23 second 987 millisecond",
+ 1199697904)
+ checkHiveHashForIntervalType(
+ "interval 66 day 12 hour 39 minute 23 second 987 millisecond 123 microsecond", 1199820904)
+ }
+
test("hive-hash for array") {
// empty array
checkHiveHash(