diff options
author | Daoyuan Wang <daoyuan.wang@intel.com> | 2015-02-03 12:21:45 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-03 12:21:45 -0800 |
commit | db821ed2ededf6ce79b838c77a9c10bed2ce555a (patch) | |
tree | d34b200923ca35a67163d179f16521ff03865648 /sql/catalyst | |
parent | 5adbb39482631998dbfe4a1da88f6e75b30fb5ac (diff) | |
download | spark-db821ed2ededf6ce79b838c77a9c10bed2ce555a.tar.gz spark-db821ed2ededf6ce79b838c77a9c10bed2ce555a.tar.bz2 spark-db821ed2ededf6ce79b838c77a9c10bed2ce555a.zip |
[SPARK-4508] [SQL] build native date type to conform behavior to Hive
The previous #3732 is reverted due to some test failure.
Have fixed that.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes #4325 from adrian-wang/datenative and squashes the following commits:
096e20d [Daoyuan Wang] fix for mixed timezone
0ed0fdc [Daoyuan Wang] fix test data
a2fdd4e [Daoyuan Wang] getDate
c37832b [Daoyuan Wang] row to catalyst
f0005b1 [Daoyuan Wang] add date in sql parser and java type conversion
024c9a6 [Daoyuan Wang] clean some import order
d6715fc [Daoyuan Wang] refactoring Date as Primitive Int internally
374abd5 [Daoyuan Wang] spark native date type support
Diffstat (limited to 'sql/catalyst')
10 files changed, 112 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 41bb4f012f..3a70d25534 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.util.hashing.MurmurHash3 import org.apache.spark.sql.catalyst.expressions.GenericRow - +import org.apache.spark.sql.types.DateUtils object Row { /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index e0db587efb..8e79e532ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.catalyst -import java.sql.{Date, Timestamp} +import java.sql.Timestamp import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ - /** * A default version of ScalaReflection that uses the runtime universe. */ @@ -72,6 +71,7 @@ trait ScalaReflection { }.toArray) case (d: BigDecimal, _) => Decimal(d) case (d: java.math.BigDecimal, _) => Decimal(d) + case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d) case (other, _) => other } @@ -85,6 +85,7 @@ trait ScalaReflection { } case (r: Row, s: StructType) => convertRowToScala(r, s) case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal + case (i: Int, DateType) => DateUtils.toJavaDate(i) case (other, _) => other } @@ -159,7 +160,7 @@ trait ScalaReflection { valueDataType, valueContainsNull = valueNullable), nullable = true) case t if t <:< typeOf[String] => Schema(StringType, nullable = true) case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true) - case t if t <:< typeOf[Date] => Schema(DateType, nullable = true) + case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true) case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true) case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true) case t if t <:< typeOf[Decimal] => Schema(DecimalType.Unlimited, nullable = true) @@ -191,7 +192,7 @@ trait ScalaReflection { case obj: LongType.JvmType => LongType case obj: FloatType.JvmType => FloatType case obj: DoubleType.JvmType => DoubleType - case obj: DateType.JvmType => DateType + case obj: java.sql.Date => DateType case obj: java.math.BigDecimal => DecimalType.Unlimited case obj: Decimal => DecimalType.Unlimited case obj: TimestampType.JvmType => TimestampType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 25e639d390..5c006e9d4c 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -52,6 +52,7 @@ class SqlParser extends AbstractSparkSQLParser { protected val CAST = Keyword("CAST") protected val COALESCE = Keyword("COALESCE") protected val COUNT = Keyword("COUNT") + protected val DATE = Keyword("DATE") protected val DECIMAL = Keyword("DECIMAL") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") @@ -383,6 +384,7 @@ class SqlParser extends AbstractSparkSQLParser { | DOUBLE ^^^ DoubleType | fixedDecimalType | DECIMAL ^^^ DecimalType.Unlimited + | DATE ^^^ DateType ) protected lazy val fixedDecimalType: Parser[DataType] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index ece5ee7361..b1bc858478 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -113,7 +113,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // UDFToString private[this] def castToString(from: DataType): Any => Any = from match { case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8")) - case DateType => buildCast[Date](_, dateToString) + case DateType => buildCast[Int](_, d => DateUtils.toString(d)) case TimestampType => buildCast[Timestamp](_, timestampToString) case _ => buildCast[Any](_, _.toString) } @@ -131,7 +131,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0) case DateType => // Hive would return null when cast from date to boolean - buildCast[Date](_, d => null) + buildCast[Int](_, d => null) case LongType => buildCast[Long](_, _ != 0) case IntegerType => @@ -171,7 +171,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case ByteType => buildCast[Byte](_, b => new Timestamp(b)) case DateType => - buildCast[Date](_, d => new Timestamp(d.getTime)) + buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime)) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -224,37 +224,24 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } } - // Converts Timestamp to string according to Hive TimestampWritable convention - private[this] def timestampToDateString(ts: Timestamp): String = { - Cast.threadLocalDateFormat.get.format(ts) - } - // DateConverter private[this] def castToDate(from: DataType): Any => Any = from match { case StringType => buildCast[String](_, s => - try Date.valueOf(s) catch { case _: java.lang.IllegalArgumentException => null }) + try DateUtils.fromJavaDate(Date.valueOf(s)) + catch { case _: java.lang.IllegalArgumentException => null } + ) case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Timestamp](_, t => new Date(Math.floor(t.getTime / 1000.0).toLong * 1000)) + buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime)) // Hive throws this exception as a Semantic Exception - // It is never possible to compare result when hive return with exception, so we can return null + // It is never possible to compare result when hive return with exception, + // so we can return null // NULL is more reasonable here, since the query itself obeys the grammar. case _ => _ => null } - // Date cannot be cast to long, according to hive - private[this] def dateToLong(d: Date) = null - - // Date cannot be cast to double, according to hive - private[this] def dateToDouble(d: Date) = null - - // Converts Date to string according to Hive DateWritable convention - private[this] def dateToString(d: Date): String = { - Cast.threadLocalDateFormat.get.format(d) - } - // LongConverter private[this] def castToLong(from: DataType): Any => Any = from match { case StringType => @@ -264,7 +251,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => if (b) 1L else 0L) case DateType => - buildCast[Date](_, d => dateToLong(d)) + buildCast[Int](_, d => null) case TimestampType => buildCast[Timestamp](_, t => timestampToLong(t)) case x: NumericType => @@ -280,7 +267,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => - buildCast[Date](_, d => dateToLong(d)) + buildCast[Int](_, d => null) case TimestampType => buildCast[Timestamp](_, t => timestampToLong(t).toInt) case x: NumericType => @@ -296,7 +283,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort) case DateType => - buildCast[Date](_, d => dateToLong(d)) + buildCast[Int](_, d => null) case TimestampType => buildCast[Timestamp](_, t => timestampToLong(t).toShort) case x: NumericType => @@ -312,7 +299,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte) case DateType => - buildCast[Date](_, d => dateToLong(d)) + buildCast[Int](_, d => null) case TimestampType => buildCast[Timestamp](_, t => timestampToLong(t).toByte) case x: NumericType => @@ -342,7 +329,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => changePrecision(if (b) Decimal(1) else Decimal(0), target)) case DateType => - buildCast[Date](_, d => null) // date can't cast to decimal in Hive + buildCast[Int](_, d => null) // date can't cast to decimal in Hive case TimestampType => // Note that we lose precision here. buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target)) @@ -367,7 +354,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => if (b) 1d else 0d) case DateType => - buildCast[Date](_, d => dateToDouble(d)) + buildCast[Int](_, d => null) case TimestampType => buildCast[Timestamp](_, t => timestampToDouble(t)) case x: NumericType => @@ -383,7 +370,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case BooleanType => buildCast[Boolean](_, b => if (b) 1f else 0f) case DateType => - buildCast[Date](_, d => dateToDouble(d)) + buildCast[Int](_, d => null) case TimestampType => buildCast[Timestamp](_, t => timestampToDouble(t).toFloat) case x: NumericType => @@ -442,16 +429,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w object Cast { // `SimpleDateFormat` is not thread-safe. - private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] { + private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] { override def initialValue() = { - new SimpleDateFormat("yyyy-MM-dd") + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") } } // `SimpleDateFormat` is not thread-safe. - private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] { + private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] { override def initialValue() = { - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + new SimpleDateFormat("yyyy-MM-dd") } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 4cae5c4718..1f80d84b74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -246,6 +246,9 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin new String(${eval.primitiveTerm}.asInstanceOf[Array[Byte]]) """.children + case Cast(child @ DateType(), StringType) => + child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType) + case Cast(child @ NumericType(), IntegerType) => child.castOrNull(c => q"$c.toInt", IntegerType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 5b389aad7a..97bb96f48e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -35,7 +35,7 @@ object Literal { case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited) case d: Decimal => Literal(d, DecimalType.Unlimited) case t: Timestamp => Literal(t, TimestampType) - case d: Date => Literal(d, DateType) + case d: Date => Literal(DateUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) case null => Literal(null, NullType) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala new file mode 100644 index 0000000000..8a1a3b81b3 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +import java.sql.Date +import java.util.{Calendar, TimeZone} + +import org.apache.spark.sql.catalyst.expressions.Cast + +/** + * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date + */ +object DateUtils { + private val MILLIS_PER_DAY = 86400000 + + // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. + private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] { + override protected def initialValue: TimeZone = { + Calendar.getInstance.getTimeZone + } + } + + private def javaDateToDays(d: Date): Int = { + millisToDays(d.getTime) + } + + def millisToDays(millisLocal: Long): Int = { + ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt + } + + private def toMillisSinceEpoch(days: Int): Long = { + val millisUtc = days.toLong * MILLIS_PER_DAY + millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc) + } + + def fromJavaDate(date: java.sql.Date): Int = { + javaDateToDays(date) + } + + def toJavaDate(daysSinceEpoch: Int): java.sql.Date = { + new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch)) + } + + def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days)) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index defdcb2b70..4825d1ff81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.types -import java.sql.{Date, Timestamp} +import java.sql.Timestamp import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral} import scala.reflect.ClassTag @@ -387,18 +387,16 @@ case object TimestampType extends NativeType { */ @DeveloperApi case object DateType extends NativeType { - private[sql] type JvmType = Date + private[sql] type JvmType = Int @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } - private[sql] val ordering = new Ordering[JvmType] { - def compare(x: Date, y: Date) = x.compareTo(y) - } + private[sql] val ordering = implicitly[Ordering[JvmType]] /** - * The default size of a value of the DateType is 8 bytes. + * The default size of a value of the DateType is 4 bytes. */ - override def defaultSize: Int = 8 + override def defaultSize: Int = 4 } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 37e64adeea..25d1c105a0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -303,6 +303,7 @@ class ExpressionEvaluationSuite extends FunSuite { val sd = "1970-01-01" val d = Date.valueOf(sd) + val zts = sd + " 00:00:00" val sts = sd + " 00:00:02" val nts = sts + ".1" val ts = Timestamp.valueOf(nts) @@ -319,14 +320,14 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation(Cast(Literal(1.toDouble) cast TimestampType, DoubleType), 1.toDouble) checkEvaluation(Cast(Literal(sd) cast DateType, StringType), sd) - checkEvaluation(Cast(Literal(d) cast StringType, DateType), d) + checkEvaluation(Cast(Literal(d) cast StringType, DateType), 0) checkEvaluation(Cast(Literal(nts) cast TimestampType, StringType), nts) checkEvaluation(Cast(Literal(ts) cast StringType, TimestampType), ts) // all convert to string type to check checkEvaluation( Cast(Cast(Literal(nts) cast TimestampType, DateType), StringType), sd) checkEvaluation( - Cast(Cast(Literal(ts) cast DateType, TimestampType), StringType), sts) + Cast(Cast(Literal(ts) cast DateType, TimestampType), StringType), zts) checkEvaluation(Cast("abdef" cast BinaryType, StringType), "abdef") @@ -377,8 +378,8 @@ class ExpressionEvaluationSuite extends FunSuite { } test("date") { - val d1 = Date.valueOf("1970-01-01") - val d2 = Date.valueOf("1970-01-02") + val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01")) + val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02")) checkEvaluation(Literal(d1) < Literal(d2), true) } @@ -459,22 +460,21 @@ class ExpressionEvaluationSuite extends FunSuite { test("date casting") { val d = Date.valueOf("1970-01-01") - checkEvaluation(Cast(d, ShortType), null) - checkEvaluation(Cast(d, IntegerType), null) - checkEvaluation(Cast(d, LongType), null) - checkEvaluation(Cast(d, FloatType), null) - checkEvaluation(Cast(d, DoubleType), null) - checkEvaluation(Cast(d, DecimalType.Unlimited), null) - checkEvaluation(Cast(d, DecimalType(10, 2)), null) - checkEvaluation(Cast(d, StringType), "1970-01-01") - checkEvaluation(Cast(Cast(d, TimestampType), StringType), "1970-01-01 00:00:00") + checkEvaluation(Cast(Literal(d), ShortType), null) + checkEvaluation(Cast(Literal(d), IntegerType), null) + checkEvaluation(Cast(Literal(d), LongType), null) + checkEvaluation(Cast(Literal(d), FloatType), null) + checkEvaluation(Cast(Literal(d), DoubleType), null) + checkEvaluation(Cast(Literal(d), DecimalType.Unlimited), null) + checkEvaluation(Cast(Literal(d), DecimalType(10, 2)), null) + checkEvaluation(Cast(Literal(d), StringType), "1970-01-01") + checkEvaluation(Cast(Cast(Literal(d), TimestampType), StringType), "1970-01-01 00:00:00") } test("timestamp casting") { val millis = 15 * 1000 + 2 val seconds = millis * 1000 + 2 val ts = new Timestamp(millis) - val ts1 = new Timestamp(15 * 1000) // a timestamp without the milliseconds part val tss = new Timestamp(seconds) checkEvaluation(Cast(ts, ShortType), 15) checkEvaluation(Cast(ts, IntegerType), 15) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index c147be9f6b..7bcd6687d1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -106,7 +106,7 @@ class DataTypeSuite extends FunSuite { checkDefaultSize(DoubleType, 8) checkDefaultSize(DecimalType(10, 5), 4096) checkDefaultSize(DecimalType.Unlimited, 4096) - checkDefaultSize(DateType, 8) + checkDefaultSize(DateType, 4) checkDefaultSize(TimestampType, 8) checkDefaultSize(StringType, 4096) checkDefaultSize(BinaryType, 4096) |